diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index c362864b6..47468b357 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -1171,7 +1171,7 @@ impl CodexMessageProcessor { let exec_params = ExecParams { command: params.command, cwd, - timeout_ms, + expiration: timeout_ms.into(), env, with_escalated_permissions: None, justification: None, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 8b8292cd1..098db0c4f 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -3051,6 +3051,7 @@ mod tests { let session = Arc::new(session); let mut turn_context = Arc::new(turn_context_raw); + let timeout_ms = 1000; let params = ExecParams { command: if cfg!(windows) { vec![ @@ -3066,7 +3067,7 @@ mod tests { ] }, cwd: turn_context.cwd.clone(), - timeout_ms: Some(1000), + expiration: timeout_ms.into(), env: HashMap::new(), with_escalated_permissions: Some(true), justification: Some("test".to_string()), @@ -3075,7 +3076,12 @@ mod tests { let params2 = ExecParams { with_escalated_permissions: Some(false), - ..params.clone() + command: params.command.clone(), + cwd: params.cwd.clone(), + expiration: timeout_ms.into(), + env: HashMap::new(), + justification: params.justification.clone(), + arg0: None, }; let turn_diff_tracker = Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())); @@ -3095,7 +3101,7 @@ mod tests { arguments: serde_json::json!({ "command": params.command.clone(), "workdir": Some(turn_context.cwd.to_string_lossy().to_string()), - "timeout_ms": params.timeout_ms, + "timeout_ms": params.expiration.timeout_ms(), "with_escalated_permissions": params.with_escalated_permissions, "justification": params.justification.clone(), }) @@ -3132,7 +3138,7 @@ mod tests { arguments: serde_json::json!({ "command": params2.command.clone(), "workdir": Some(turn_context.cwd.to_string_lossy().to_string()), - "timeout_ms": params2.timeout_ms, + "timeout_ms": params2.expiration.timeout_ms(), "with_escalated_permissions": params2.with_escalated_permissions, "justification": params2.justification.clone(), }) diff --git a/codex-rs/core/src/exec.rs b/codex-rs/core/src/exec.rs index 0378f5ddf..42576907e 100644 --- a/codex-rs/core/src/exec.rs +++ b/codex-rs/core/src/exec.rs @@ -14,6 +14,7 @@ use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; use tokio::io::BufReader; use tokio::process::Child; +use tokio_util::sync::CancellationToken; use crate::error::CodexErr; use crate::error::Result; @@ -47,20 +48,59 @@ const AGGREGATE_BUFFER_INITIAL_CAPACITY: usize = 8 * 1024; // 8 KiB /// Aggregation still collects full output; only the live event stream is capped. pub(crate) const MAX_EXEC_OUTPUT_DELTAS_PER_CALL: usize = 10_000; -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct ExecParams { pub command: Vec, pub cwd: PathBuf, - pub timeout_ms: Option, + pub expiration: ExecExpiration, pub env: HashMap, pub with_escalated_permissions: Option, pub justification: Option, pub arg0: Option, } -impl ExecParams { - pub fn timeout_duration(&self) -> Duration { - Duration::from_millis(self.timeout_ms.unwrap_or(DEFAULT_TIMEOUT_MS)) +/// Mechanism to terminate an exec invocation before it finishes naturally. +#[derive(Debug)] +pub enum ExecExpiration { + Timeout(Duration), + DefaultTimeout, + Cancellation(CancellationToken), +} + +impl From> for ExecExpiration { + fn from(timeout_ms: Option) -> Self { + timeout_ms.map_or(ExecExpiration::DefaultTimeout, |timeout_ms| { + ExecExpiration::Timeout(Duration::from_millis(timeout_ms)) + }) + } +} + +impl From for ExecExpiration { + fn from(timeout_ms: u64) -> Self { + ExecExpiration::Timeout(Duration::from_millis(timeout_ms)) + } +} + +impl ExecExpiration { + async fn wait(self) { + match self { + ExecExpiration::Timeout(duration) => tokio::time::sleep(duration).await, + ExecExpiration::DefaultTimeout => { + tokio::time::sleep(Duration::from_millis(DEFAULT_TIMEOUT_MS)).await + } + ExecExpiration::Cancellation(cancel) => { + cancel.cancelled().await; + } + } + } + + /// If ExecExpiration is a timeout, returns the timeout in milliseconds. + pub(crate) fn timeout_ms(&self) -> Option { + match self { + ExecExpiration::Timeout(duration) => Some(duration.as_millis() as u64), + ExecExpiration::DefaultTimeout => Some(DEFAULT_TIMEOUT_MS), + ExecExpiration::Cancellation(_) => None, + } } } @@ -96,7 +136,7 @@ pub async fn process_exec_tool_call( let ExecParams { command, cwd, - timeout_ms, + expiration, env, with_escalated_permissions, justification, @@ -115,7 +155,7 @@ pub async fn process_exec_tool_call( args: args.to_vec(), cwd, env, - timeout_ms, + expiration, with_escalated_permissions, justification, }; @@ -123,7 +163,7 @@ pub async fn process_exec_tool_call( let manager = SandboxManager::new(); let exec_env = manager .transform( - &spec, + spec, sandbox_policy, sandbox_type, sandbox_cwd, @@ -132,7 +172,7 @@ pub async fn process_exec_tool_call( .map_err(CodexErr::from)?; // Route through the sandboxing module for a single, unified execution path. - crate::sandboxing::execute_env(&exec_env, sandbox_policy, stdout_stream).await + crate::sandboxing::execute_env(exec_env, sandbox_policy, stdout_stream).await } pub(crate) async fn execute_exec_env( @@ -144,7 +184,7 @@ pub(crate) async fn execute_exec_env( command, cwd, env, - timeout_ms, + expiration, sandbox, with_escalated_permissions, justification, @@ -154,7 +194,7 @@ pub(crate) async fn execute_exec_env( let params = ExecParams { command, cwd, - timeout_ms, + expiration, env, with_escalated_permissions, justification, @@ -179,9 +219,12 @@ async fn exec_windows_sandbox( command, cwd, env, - timeout_ms, + expiration, .. } = params; + // TODO(iceweasel-oai): run_windows_sandbox_capture should support all + // variants of ExecExpiration, not just timeout. + let timeout_ms = expiration.timeout_ms(); let policy_str = serde_json::to_string(sandbox_policy).map_err(|err| { CodexErr::Io(io::Error::other(format!( @@ -449,12 +492,12 @@ async fn exec( { return exec_windows_sandbox(params, sandbox_policy).await; } - let timeout = params.timeout_duration(); let ExecParams { command, cwd, env, arg0, + expiration, .. } = params; @@ -475,14 +518,14 @@ async fn exec( env, ) .await?; - consume_truncated_output(child, timeout, stdout_stream).await + consume_truncated_output(child, expiration, stdout_stream).await } /// Consumes the output of a child process, truncating it so it is suitable for /// use as the output of a `shell` tool call. Also enforces specified timeout. async fn consume_truncated_output( mut child: Child, - timeout: Duration, + expiration: ExecExpiration, stdout_stream: Option, ) -> Result { // Both stdout and stderr were configured with `Stdio::piped()` @@ -516,20 +559,14 @@ async fn consume_truncated_output( )); let (exit_status, timed_out) = tokio::select! { - result = tokio::time::timeout(timeout, child.wait()) => { - match result { - Ok(status_result) => { - let exit_status = status_result?; - (exit_status, false) - } - Err(_) => { - // timeout - kill_child_process_group(&mut child)?; - child.start_kill()?; - // Debatable whether `child.wait().await` should be called here. - (synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true) - } - } + status_result = child.wait() => { + let exit_status = status_result?; + (exit_status, false) + } + _ = expiration.wait() => { + kill_child_process_group(&mut child)?; + child.start_kill()?; + (synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true) } _ = tokio::signal::ctrl_c() => { kill_child_process_group(&mut child)?; @@ -799,7 +836,7 @@ mod tests { let params = ExecParams { command, cwd: std::env::current_dir()?, - timeout_ms: Some(500), + expiration: 500.into(), env, with_escalated_permissions: None, justification: None, @@ -833,4 +870,62 @@ mod tests { assert!(killed, "grandchild process with pid {pid} is still alive"); Ok(()) } + + #[tokio::test] + async fn process_exec_tool_call_respects_cancellation_token() -> Result<()> { + let command = long_running_command(); + let cwd = std::env::current_dir()?; + let env: HashMap = std::env::vars().collect(); + let cancel_token = CancellationToken::new(); + let cancel_tx = cancel_token.clone(); + let params = ExecParams { + command, + cwd: cwd.clone(), + expiration: ExecExpiration::Cancellation(cancel_token), + env, + with_escalated_permissions: None, + justification: None, + arg0: None, + }; + tokio::spawn(async move { + tokio::time::sleep(Duration::from_millis(1_000)).await; + cancel_tx.cancel(); + }); + let result = process_exec_tool_call( + params, + SandboxType::None, + &SandboxPolicy::DangerFullAccess, + cwd.as_path(), + &None, + None, + ) + .await; + let output = match result { + Err(CodexErr::Sandbox(SandboxErr::Timeout { output })) => output, + other => panic!("expected timeout error, got {other:?}"), + }; + assert!(output.timed_out); + assert_eq!(output.exit_code, EXEC_TIMEOUT_EXIT_CODE); + Ok(()) + } + + #[cfg(unix)] + fn long_running_command() -> Vec { + vec![ + "/bin/sh".to_string(), + "-c".to_string(), + "sleep 30".to_string(), + ] + } + + #[cfg(windows)] + fn long_running_command() -> Vec { + vec![ + "powershell.exe".to_string(), + "-NonInteractive".to_string(), + "-NoLogo".to_string(), + "-Command".to_string(), + "Start-Sleep -Seconds 30".to_string(), + ] + } } diff --git a/codex-rs/core/src/sandboxing/mod.rs b/codex-rs/core/src/sandboxing/mod.rs index 4ecb2a8c1..d43646021 100644 --- a/codex-rs/core/src/sandboxing/mod.rs +++ b/codex-rs/core/src/sandboxing/mod.rs @@ -8,6 +8,7 @@ ready‑to‑spawn environment. pub mod assessment; +use crate::exec::ExecExpiration; use crate::exec::ExecToolCallOutput; use crate::exec::SandboxType; use crate::exec::StdoutStream; @@ -48,23 +49,23 @@ impl From for SandboxPermissions { } } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct CommandSpec { pub program: String, pub args: Vec, pub cwd: PathBuf, pub env: HashMap, - pub timeout_ms: Option, + pub expiration: ExecExpiration, pub with_escalated_permissions: Option, pub justification: Option, } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct ExecEnv { pub command: Vec, pub cwd: PathBuf, pub env: HashMap, - pub timeout_ms: Option, + pub expiration: ExecExpiration, pub sandbox: SandboxType, pub with_escalated_permissions: Option, pub justification: Option, @@ -115,13 +116,13 @@ impl SandboxManager { pub(crate) fn transform( &self, - spec: &CommandSpec, + mut spec: CommandSpec, policy: &SandboxPolicy, sandbox: SandboxType, sandbox_policy_cwd: &Path, codex_linux_sandbox_exe: Option<&PathBuf>, ) -> Result { - let mut env = spec.env.clone(); + let mut env = spec.env; if !policy.has_full_network_access() { env.insert( CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR.to_string(), @@ -130,8 +131,8 @@ impl SandboxManager { } let mut command = Vec::with_capacity(1 + spec.args.len()); - command.push(spec.program.clone()); - command.extend(spec.args.iter().cloned()); + command.push(spec.program); + command.append(&mut spec.args); let (command, sandbox_env, arg0_override) = match sandbox { SandboxType::None => (command, HashMap::new(), None), @@ -176,12 +177,12 @@ impl SandboxManager { Ok(ExecEnv { command, - cwd: spec.cwd.clone(), + cwd: spec.cwd, env, - timeout_ms: spec.timeout_ms, + expiration: spec.expiration, sandbox, with_escalated_permissions: spec.with_escalated_permissions, - justification: spec.justification.clone(), + justification: spec.justification, arg0: arg0_override, }) } @@ -192,9 +193,9 @@ impl SandboxManager { } pub async fn execute_env( - env: &ExecEnv, + env: ExecEnv, policy: &SandboxPolicy, stdout_stream: Option, ) -> crate::error::Result { - execute_exec_env(env.clone(), policy, stdout_stream).await + execute_exec_env(env, policy, stdout_stream).await } diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index 190578c37..32e8a2596 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -95,7 +95,9 @@ impl SessionTask for UserShellCommandTask { command: command.clone(), cwd: cwd.clone(), env: create_env(&turn_context.shell_environment_policy), - timeout_ms: Some(USER_SHELL_TIMEOUT_MS), + // TODO(zhao-oai): Now that we have ExecExpiration::Cancellation, we + // should use that instead of an "arbitrarily large" timeout here. + expiration: USER_SHELL_TIMEOUT_MS.into(), sandbox: SandboxType::None, with_escalated_permissions: None, justification: None, diff --git a/codex-rs/core/src/tools/handlers/shell.rs b/codex-rs/core/src/tools/handlers/shell.rs index 3cdf8af57..99c822fa5 100644 --- a/codex-rs/core/src/tools/handlers/shell.rs +++ b/codex-rs/core/src/tools/handlers/shell.rs @@ -37,7 +37,7 @@ impl ShellHandler { ExecParams { command: params.command, cwd: turn_context.resolve_path(params.workdir.clone()), - timeout_ms: params.timeout_ms, + expiration: params.timeout_ms.into(), env: create_env(&turn_context.shell_environment_policy), with_escalated_permissions: params.with_escalated_permissions, justification: params.justification, @@ -59,7 +59,7 @@ impl ShellCommandHandler { ExecParams { command, cwd: turn_context.resolve_path(params.workdir.clone()), - timeout_ms: params.timeout_ms, + expiration: params.timeout_ms.into(), env: create_env(&turn_context.shell_environment_policy), with_escalated_permissions: params.with_escalated_permissions, justification: params.justification, @@ -243,7 +243,7 @@ impl ShellHandler { let req = ApplyPatchRequest { patch: apply.action.patch.clone(), cwd: apply.action.cwd.clone(), - timeout_ms: exec_params.timeout_ms, + timeout_ms: exec_params.expiration.timeout_ms(), user_explicitly_approved: apply.user_explicitly_approved_this_action, codex_exe: turn.codex_linux_sandbox_exe.clone(), }; @@ -300,7 +300,7 @@ impl ShellHandler { let req = ShellRequest { command: exec_params.command.clone(), cwd: exec_params.cwd.clone(), - timeout_ms: exec_params.timeout_ms, + timeout_ms: exec_params.expiration.timeout_ms(), env: exec_params.env.clone(), with_escalated_permissions: exec_params.with_escalated_permissions, justification: exec_params.justification.clone(), diff --git a/codex-rs/core/src/tools/runtimes/apply_patch.rs b/codex-rs/core/src/tools/runtimes/apply_patch.rs index 0cdddd508..2334f1e71 100644 --- a/codex-rs/core/src/tools/runtimes/apply_patch.rs +++ b/codex-rs/core/src/tools/runtimes/apply_patch.rs @@ -67,7 +67,7 @@ impl ApplyPatchRuntime { program, args: vec![CODEX_APPLY_PATCH_ARG1.to_string(), req.patch.clone()], cwd: req.cwd.clone(), - timeout_ms: req.timeout_ms, + expiration: req.timeout_ms.into(), // Run apply_patch with a minimal environment for determinism and to avoid leaks. env: HashMap::new(), with_escalated_permissions: None, @@ -153,9 +153,9 @@ impl ToolRuntime for ApplyPatchRuntime { ) -> Result { let spec = Self::build_command_spec(req)?; let env = attempt - .env_for(&spec) + .env_for(spec) .map_err(|err| ToolError::Codex(err.into()))?; - let out = execute_env(&env, attempt.policy, Self::stdout_stream(ctx)) + let out = execute_env(env, attempt.policy, Self::stdout_stream(ctx)) .await .map_err(ToolError::Codex)?; Ok(out) diff --git a/codex-rs/core/src/tools/runtimes/mod.rs b/codex-rs/core/src/tools/runtimes/mod.rs index 212163d72..437f4af42 100644 --- a/codex-rs/core/src/tools/runtimes/mod.rs +++ b/codex-rs/core/src/tools/runtimes/mod.rs @@ -4,6 +4,7 @@ Module: runtimes Concrete ToolRuntime implementations for specific tools. Each runtime stays small and focused and reuses the orchestrator for approvals + sandbox + retry. */ +use crate::exec::ExecExpiration; use crate::sandboxing::CommandSpec; use crate::tools::sandboxing::ToolError; use std::collections::HashMap; @@ -19,7 +20,7 @@ pub(crate) fn build_command_spec( command: &[String], cwd: &Path, env: &HashMap, - timeout_ms: Option, + expiration: ExecExpiration, with_escalated_permissions: Option, justification: Option, ) -> Result { @@ -31,7 +32,7 @@ pub(crate) fn build_command_spec( args: args.to_vec(), cwd: cwd.to_path_buf(), env: env.clone(), - timeout_ms, + expiration, with_escalated_permissions, justification, }) diff --git a/codex-rs/core/src/tools/runtimes/shell.rs b/codex-rs/core/src/tools/runtimes/shell.rs index d71c4498e..b46f72b48 100644 --- a/codex-rs/core/src/tools/runtimes/shell.rs +++ b/codex-rs/core/src/tools/runtimes/shell.rs @@ -133,14 +133,14 @@ impl ToolRuntime for ShellRuntime { &req.command, &req.cwd, &req.env, - req.timeout_ms, + req.timeout_ms.into(), req.with_escalated_permissions, req.justification.clone(), )?; let env = attempt - .env_for(&spec) + .env_for(spec) .map_err(|err| ToolError::Codex(err.into()))?; - let out = execute_env(&env, attempt.policy, Self::stdout_stream(ctx)) + let out = execute_env(env, attempt.policy, Self::stdout_stream(ctx)) .await .map_err(ToolError::Codex)?; Ok(out) diff --git a/codex-rs/core/src/tools/runtimes/unified_exec.rs b/codex-rs/core/src/tools/runtimes/unified_exec.rs index 5b18476bf..3f0362259 100644 --- a/codex-rs/core/src/tools/runtimes/unified_exec.rs +++ b/codex-rs/core/src/tools/runtimes/unified_exec.rs @@ -6,6 +6,7 @@ the session manager to spawn PTYs once an ExecEnv is prepared. */ use crate::error::CodexErr; use crate::error::SandboxErr; +use crate::exec::ExecExpiration; use crate::tools::runtimes::build_command_spec; use crate::tools::sandboxing::Approvable; use crate::tools::sandboxing::ApprovalCtx; @@ -150,13 +151,13 @@ impl<'a> ToolRuntime for UnifiedExecRunt &req.command, &req.cwd, &req.env, - None, + ExecExpiration::DefaultTimeout, req.with_escalated_permissions, req.justification.clone(), ) .map_err(|_| ToolError::Rejected("missing command line for PTY".to_string()))?; let exec_env = attempt - .env_for(&spec) + .env_for(spec) .map_err(|err| ToolError::Codex(err.into()))?; self.manager .open_session_with_exec_env(&exec_env) diff --git a/codex-rs/core/src/tools/sandboxing.rs b/codex-rs/core/src/tools/sandboxing.rs index e694c7fbe..f9e3e20ea 100644 --- a/codex-rs/core/src/tools/sandboxing.rs +++ b/codex-rs/core/src/tools/sandboxing.rs @@ -216,7 +216,7 @@ pub(crate) struct SandboxAttempt<'a> { impl<'a> SandboxAttempt<'a> { pub fn env_for( &self, - spec: &CommandSpec, + spec: CommandSpec, ) -> Result { self.manager.transform( spec, diff --git a/codex-rs/core/tests/suite/exec.rs b/codex-rs/core/tests/suite/exec.rs index ea5ab8487..bb0f1bce0 100644 --- a/codex-rs/core/tests/suite/exec.rs +++ b/codex-rs/core/tests/suite/exec.rs @@ -32,7 +32,7 @@ async fn run_test_cmd(tmp: TempDir, cmd: Vec<&str>) -> Result