refactor: prepare unified exec for zsh-fork backend (#13392)

## Why

`shell_zsh_fork` already provides stronger guarantees around which
executables receive elevated permissions. To reuse that machinery from
unified exec without pushing Unix-specific escalation details through
generic runtime code, the escalation bootstrap and session lifetime
handling need a cleaner boundary.

That boundary also needs to be safe for long-lived sessions: when an
intercepted shell session is closed or pruned, any in-flight approval
workers and any already-approved escalated child they spawned must be
torn down with the session, and the inherited escalation socket must not
leak into unrelated subprocesses.

## What Changed

- Extracted a reusable `EscalationSession` and
`EscalateServer::start_session(...)` in `shell-escalation` so callers
can get the wrapper/socket env overlay and keep the escalation server
alive without immediately running a one-shot command.
- Documented that `EscalationSession::env()` and
`ShellCommandExecutor::run(...)` exchange only that env overlay, which
callers must merge into their own base shell environment.
- Clarified the prepared-exec helper boundary in `core` by naming the
new helper APIs around `ExecRequest`, while keeping the legacy
`execute_env(...)` entrypoints as thin compatibility wrappers for
existing callers that still use the older naming.
- Added a small post-spawn hook on the prepared execution path so the
parent copy of the inheritable escalation socket is closed immediately
after both the existing one-shot shell-command spawn and the
unified-exec spawn.
- Made session teardown explicit with session-scoped cancellation:
dropping an `EscalationSession` or canceling its parent request now
stops intercept workers, and the server-spawned escalated child uses
`kill_on_drop(true)` so teardown cannot orphan an already-approved
child.
- Added `UnifiedExecBackendConfig` plumbing through `ToolsConfig`, a
`shell::zsh_fork_backend` facade, and an opaque unified-exec
spawn-lifecycle hook so unified exec can prepare a wrapped `zsh -c/-lc`
request without storing `EscalationSession` directly in generic
process/runtime code.
- Kept the existing `shell_command` zsh-fork behavior intact on top of
the new bootstrap path. Tool selection is unchanged in this PR: when
`shell_zsh_fork` is enabled, `ShellCommand` still wins over
`exec_command`.

## Verification

- `cargo test -p codex-shell-escalation`
  - includes coverage for `start_session_exposes_wrapper_env_overlay`
  - includes coverage for `exec_closes_parent_socket_after_shell_spawn`
- includes coverage for
`dropping_session_aborts_intercept_workers_and_kills_spawned_child`
- `cargo test -p codex-core
shell_zsh_fork_prefers_shell_command_over_unified_exec`
- `cargo test -p codex-core --test all
shell_zsh_fork_prompts_for_skill_script_execution`


---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/13392).
* #13432
* __->__ #13392
This commit is contained in:
Michael Bolin 2026-03-05 00:55:12 -08:00 committed by GitHub
parent 1ce1712aeb
commit b4cb989563
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 821 additions and 76 deletions

View file

@ -231,10 +231,11 @@ pub async fn process_exec_tool_call(
crate::sandboxing::execute_env(exec_req, stdout_stream).await
}
pub(crate) async fn execute_exec_env(
env: ExecRequest,
pub(crate) async fn execute_exec_request(
exec_request: ExecRequest,
sandbox_policy: &SandboxPolicy,
stdout_stream: Option<StdoutStream>,
after_spawn: Option<Box<dyn FnOnce() + Send>>,
) -> Result<ExecToolCallOutput> {
let ExecRequest {
command,
@ -248,7 +249,7 @@ pub(crate) async fn execute_exec_env(
sandbox_policy: _sandbox_policy_from_env,
justification,
arg0,
} = env;
} = exec_request;
let params = ExecParams {
command,
@ -263,7 +264,7 @@ pub(crate) async fn execute_exec_env(
};
let start = Instant::now();
let raw_output_result = exec(params, sandbox, sandbox_policy, stdout_stream).await;
let raw_output_result = exec(params, sandbox, sandbox_policy, stdout_stream, after_spawn).await;
let duration = start.elapsed();
finalize_exec_result(raw_output_result, sandbox, duration)
}
@ -693,6 +694,7 @@ async fn exec(
sandbox: SandboxType,
sandbox_policy: &SandboxPolicy,
stdout_stream: Option<StdoutStream>,
after_spawn: Option<Box<dyn FnOnce() + Send>>,
) -> Result<RawExecToolCallOutput> {
#[cfg(target_os = "windows")]
if sandbox == SandboxType::WindowsRestrictedToken
@ -738,6 +740,9 @@ async fn exec(
env,
})
.await?;
if let Some(after_spawn) = after_spawn {
after_spawn();
}
consume_truncated_output(child, expiration, stdout_stream).await
}
@ -1136,6 +1141,7 @@ mod tests {
SandboxType::None,
&SandboxPolicy::new_read_only_policy(),
None,
None,
)
.await?;
assert!(output.timed_out);

View file

@ -10,7 +10,7 @@ use crate::exec::ExecExpiration;
use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::StdoutStream;
use crate::exec::execute_exec_env;
use crate::exec::execute_exec_request;
use crate::landlock::allow_network_for_proxy;
use crate::landlock::create_linux_sandbox_command_args;
use crate::protocol::SandboxPolicy;
@ -418,11 +418,20 @@ impl SandboxManager {
}
pub async fn execute_env(
env: ExecRequest,
exec_request: ExecRequest,
stdout_stream: Option<StdoutStream>,
) -> crate::error::Result<ExecToolCallOutput> {
let effective_policy = env.sandbox_policy.clone();
execute_exec_env(env, &effective_policy, stdout_stream).await
let effective_policy = exec_request.sandbox_policy.clone();
execute_exec_request(exec_request, &effective_policy, stdout_stream, None).await
}
pub async fn execute_exec_request_with_after_spawn(
exec_request: ExecRequest,
stdout_stream: Option<StdoutStream>,
after_spawn: Option<Box<dyn FnOnce() + Send>>,
) -> crate::error::Result<ExecToolCallOutput> {
let effective_policy = exec_request.sandbox_policy.clone();
execute_exec_request(exec_request, &effective_policy, stdout_stream, after_spawn).await
}
#[cfg(test)]

View file

@ -14,7 +14,7 @@ use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::StdoutStream;
use crate::exec::StreamOutput;
use crate::exec::execute_exec_env;
use crate::exec::execute_exec_request;
use crate::exec_env::create_env;
use crate::parse_command::parse_command;
use crate::protocol::EventMsg;
@ -177,7 +177,7 @@ pub(crate) async fn execute_user_shell_command(
tx_event: session.get_tx_event(),
});
let exec_result = execute_exec_env(exec_env, &sandbox_policy, stdout_stream)
let exec_result = execute_exec_request(exec_env, &sandbox_policy, stdout_stream, None)
.or_cancel(&cancellation_token)
.await;

View file

@ -5,7 +5,8 @@ Executes shell requests under the orchestrator: asks for approval when needed,
builds a CommandSpec, and runs it under the current SandboxAttempt.
*/
#[cfg(unix)]
mod unix_escalation;
pub(crate) mod unix_escalation;
pub(crate) mod zsh_fork_backend;
use crate::command_canonicalization::canonicalize_command_for_approval;
use crate::exec::ExecToolCallOutput;
@ -80,7 +81,6 @@ pub(crate) enum ShellRuntimeBackend {
#[derive(Default)]
pub struct ShellRuntime {
#[cfg_attr(not(unix), allow(dead_code))]
backend: ShellRuntimeBackend,
}
@ -215,9 +215,8 @@ impl ToolRuntime<ShellRequest, ExecToolCallOutput> for ShellRuntime {
command
};
#[cfg(unix)]
if self.backend == ShellRuntimeBackend::ShellCommandZshFork {
match unix_escalation::try_run_zsh_fork(req, attempt, ctx, &command).await? {
match zsh_fork_backend::maybe_run_shell_command(req, attempt, ctx, &command).await? {
Some(out) => return Ok(out),
None => {
tracing::warn!(

View file

@ -6,6 +6,7 @@ use crate::exec::ExecToolCallOutput;
use crate::exec::SandboxType;
use crate::exec::is_likely_sandbox_denied;
use crate::features::Feature;
use crate::sandboxing::ExecRequest;
use crate::sandboxing::SandboxPermissions;
use crate::shell::ShellType;
use crate::skills::SkillMetadata;
@ -36,6 +37,7 @@ use codex_shell_escalation::EscalationDecision;
use codex_shell_escalation::EscalationExecution;
use codex_shell_escalation::EscalationPermissions;
use codex_shell_escalation::EscalationPolicy;
use codex_shell_escalation::EscalationSession;
use codex_shell_escalation::ExecParams;
use codex_shell_escalation::ExecResult;
use codex_shell_escalation::Permissions as EscalatedPermissions;
@ -51,6 +53,11 @@ use tokio::sync::RwLock;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
pub(crate) struct PreparedUnifiedExecZshFork {
pub(crate) exec_request: ExecRequest,
pub(crate) escalation_session: EscalationSession,
}
pub(super) async fn try_run_zsh_fork(
req: &ShellRequest,
attempt: &SandboxAttempt<'_>,
@ -95,7 +102,7 @@ pub(super) async fn try_run_zsh_fork(
justification,
arg0,
} = sandbox_exec_request;
let ParsedShellCommand { script, login } = extract_shell_script(&command)?;
let ParsedShellCommand { script, login, .. } = extract_shell_script(&command)?;
let effective_timeout = Duration::from_millis(
req.timeout_ms
.unwrap_or(crate::exec::DEFAULT_EXEC_COMMAND_TIMEOUT_MS),
@ -172,6 +179,103 @@ pub(super) async fn try_run_zsh_fork(
map_exec_result(attempt.sandbox, exec_result).map(Some)
}
pub(crate) async fn prepare_unified_exec_zsh_fork(
req: &crate::tools::runtimes::unified_exec::UnifiedExecRequest,
attempt: &SandboxAttempt<'_>,
ctx: &ToolCtx,
exec_request: ExecRequest,
) -> Result<Option<PreparedUnifiedExecZshFork>, ToolError> {
let Some(shell_zsh_path) = ctx.session.services.shell_zsh_path.as_ref() else {
tracing::warn!("ZshFork backend specified, but shell_zsh_path is not configured.");
return Ok(None);
};
if !ctx.session.features().enabled(Feature::ShellZshFork) {
tracing::warn!("ZshFork backend specified, but ShellZshFork feature is not enabled.");
return Ok(None);
}
if !matches!(ctx.session.user_shell().shell_type, ShellType::Zsh) {
tracing::warn!("ZshFork backend specified, but user shell is not Zsh.");
return Ok(None);
}
let parsed = match extract_shell_script(&exec_request.command) {
Ok(parsed) => parsed,
Err(err) => {
tracing::warn!("ZshFork unified exec fallback: {err:?}");
return Ok(None);
}
};
if parsed.program != shell_zsh_path.to_string_lossy() {
tracing::warn!(
"ZshFork backend specified, but unified exec command targets `{}` instead of `{}`.",
parsed.program,
shell_zsh_path.display(),
);
return Ok(None);
}
let exec_policy = Arc::new(RwLock::new(
ctx.session.services.exec_policy.current().as_ref().clone(),
));
let command_executor = CoreShellCommandExecutor {
command: exec_request.command.clone(),
cwd: exec_request.cwd.clone(),
sandbox_policy: exec_request.sandbox_policy.clone(),
sandbox: exec_request.sandbox,
env: exec_request.env.clone(),
network: exec_request.network.clone(),
windows_sandbox_level: exec_request.windows_sandbox_level,
sandbox_permissions: exec_request.sandbox_permissions,
justification: exec_request.justification.clone(),
arg0: exec_request.arg0.clone(),
sandbox_policy_cwd: ctx.turn.cwd.clone(),
macos_seatbelt_profile_extensions: ctx
.turn
.config
.permissions
.macos_seatbelt_profile_extensions
.clone(),
codex_linux_sandbox_exe: ctx.turn.codex_linux_sandbox_exe.clone(),
use_linux_sandbox_bwrap: ctx.turn.features.enabled(Feature::UseLinuxSandboxBwrap),
};
let main_execve_wrapper_exe = ctx
.session
.services
.main_execve_wrapper_exe
.clone()
.ok_or_else(|| {
ToolError::Rejected(
"zsh fork feature enabled, but execve wrapper is not configured".to_string(),
)
})?;
let escalation_policy = CoreShellActionProvider {
policy: Arc::clone(&exec_policy),
session: Arc::clone(&ctx.session),
turn: Arc::clone(&ctx.turn),
call_id: ctx.call_id.clone(),
approval_policy: ctx.turn.approval_policy.value(),
sandbox_policy: attempt.policy.clone(),
sandbox_permissions: req.sandbox_permissions,
prompt_permissions: req.additional_permissions.clone(),
stopwatch: Stopwatch::unlimited(),
};
let escalate_server = EscalateServer::new(
shell_zsh_path.clone(),
main_execve_wrapper_exe,
escalation_policy,
);
let escalation_session = escalate_server
.start_session(CancellationToken::new(), Arc::new(command_executor))
.map_err(|err| ToolError::Rejected(err.to_string()))?;
let mut exec_request = exec_request;
exec_request.env.extend(escalation_session.env().clone());
Ok(Some(PreparedUnifiedExecZshFork {
exec_request,
escalation_session,
}))
}
struct CoreShellActionProvider {
policy: Arc<RwLock<Policy>>,
session: Arc<crate::codex::Session>,
@ -648,17 +752,20 @@ impl ShellCommandExecutor for CoreShellCommandExecutor {
&self,
_command: Vec<String>,
_cwd: PathBuf,
env: HashMap<String, String>,
env_overlay: HashMap<String, String>,
cancel_rx: CancellationToken,
after_spawn: Option<Box<dyn FnOnce() + Send>>,
) -> anyhow::Result<ExecResult> {
let mut exec_env = self.env.clone();
// `env_overlay` comes from `EscalationSession::env()`, so merge only the
// wrapper/socket variables into the base shell environment.
for var in ["CODEX_ESCALATE_SOCKET", "EXEC_WRAPPER", "BASH_EXEC_WRAPPER"] {
if let Some(value) = env.get(var) {
if let Some(value) = env_overlay.get(var) {
exec_env.insert(var.to_string(), value.clone());
}
}
let result = crate::sandboxing::execute_env(
let result = crate::sandboxing::execute_exec_request_with_after_spawn(
crate::sandboxing::ExecRequest {
command: self.command.clone(),
cwd: self.cwd.clone(),
@ -673,6 +780,7 @@ impl ShellCommandExecutor for CoreShellCommandExecutor {
arg0: self.arg0.clone(),
},
None,
after_spawn,
)
.await?;
@ -809,6 +917,7 @@ impl CoreShellCommandExecutor {
#[derive(Debug, Eq, PartialEq)]
struct ParsedShellCommand {
program: String,
script: String,
login: bool,
}
@ -817,12 +926,20 @@ fn extract_shell_script(command: &[String]) -> Result<ParsedShellCommand, ToolEr
// Commands reaching zsh-fork can be wrapped by environment/sandbox helpers, so
// we search for the first `-c`/`-lc` triple anywhere in the argv rather
// than assuming it is the first positional form.
if let Some((script, login)) = command.windows(3).find_map(|parts| match parts {
[_, flag, script] if flag == "-c" => Some((script.to_owned(), false)),
[_, flag, script] if flag == "-lc" => Some((script.to_owned(), true)),
if let Some((program, script, login)) = command.windows(3).find_map(|parts| match parts {
[program, flag, script] if flag == "-c" => {
Some((program.to_owned(), script.to_owned(), false))
}
[program, flag, script] if flag == "-lc" => {
Some((program.to_owned(), script.to_owned(), true))
}
_ => None,
}) {
return Ok(ParsedShellCommand { script, login });
return Ok(ParsedShellCommand {
program,
script,
login,
});
}
Err(ToolError::Rejected(

View file

@ -64,6 +64,7 @@ fn extract_shell_script_preserves_login_flag() {
assert_eq!(
extract_shell_script(&["/bin/zsh".into(), "-lc".into(), "echo hi".into()]).unwrap(),
ParsedShellCommand {
program: "/bin/zsh".to_string(),
script: "echo hi".to_string(),
login: true,
}
@ -71,6 +72,7 @@ fn extract_shell_script_preserves_login_flag() {
assert_eq!(
extract_shell_script(&["/bin/zsh".into(), "-c".into(), "echo hi".into()]).unwrap(),
ParsedShellCommand {
program: "/bin/zsh".to_string(),
script: "echo hi".to_string(),
login: false,
}
@ -89,6 +91,7 @@ fn extract_shell_script_supports_wrapped_command_prefixes() {
])
.unwrap(),
ParsedShellCommand {
program: "/bin/zsh".to_string(),
script: "echo hello".to_string(),
login: true,
}
@ -105,6 +108,7 @@ fn extract_shell_script_supports_wrapped_command_prefixes() {
])
.unwrap(),
ParsedShellCommand {
program: "/bin/zsh".to_string(),
script: "pwd".to_string(),
login: false,
}

View file

@ -0,0 +1,115 @@
use super::ShellRequest;
use crate::exec::ExecToolCallOutput;
use crate::sandboxing::ExecRequest;
use crate::tools::runtimes::unified_exec::UnifiedExecRequest;
use crate::tools::sandboxing::SandboxAttempt;
use crate::tools::sandboxing::ToolCtx;
use crate::tools::sandboxing::ToolError;
use crate::unified_exec::SpawnLifecycleHandle;
pub(crate) struct PreparedUnifiedExecSpawn {
pub(crate) exec_request: ExecRequest,
pub(crate) spawn_lifecycle: SpawnLifecycleHandle,
}
/// Runs the zsh-fork shell-command backend when this request should be handled
/// by executable-level escalation instead of the default shell runtime.
///
/// Returns `Ok(None)` when the current platform or request shape should fall
/// back to the normal shell-command path.
pub(crate) async fn maybe_run_shell_command(
req: &ShellRequest,
attempt: &SandboxAttempt<'_>,
ctx: &ToolCtx,
command: &[String],
) -> Result<Option<ExecToolCallOutput>, ToolError> {
imp::maybe_run_shell_command(req, attempt, ctx, command).await
}
/// Prepares unified exec to launch through the zsh-fork backend when the
/// request matches a wrapped `zsh -c/-lc` command on a supported platform.
///
/// Returns the transformed `ExecRequest` plus a spawn lifecycle that keeps the
/// escalation server alive for the session and performs post-spawn cleanup.
/// Returns `Ok(None)` when unified exec should use its normal spawn path.
pub(crate) async fn maybe_prepare_unified_exec(
req: &UnifiedExecRequest,
attempt: &SandboxAttempt<'_>,
ctx: &ToolCtx,
exec_request: ExecRequest,
) -> Result<Option<PreparedUnifiedExecSpawn>, ToolError> {
imp::maybe_prepare_unified_exec(req, attempt, ctx, exec_request).await
}
#[cfg(unix)]
mod imp {
use super::*;
use crate::tools::runtimes::shell::unix_escalation;
use crate::unified_exec::SpawnLifecycle;
use codex_shell_escalation::EscalationSession;
#[derive(Debug)]
struct ZshForkSpawnLifecycle {
escalation_session: EscalationSession,
}
impl SpawnLifecycle for ZshForkSpawnLifecycle {
fn after_spawn(&mut self) {
self.escalation_session.close_client_socket();
}
}
pub(super) async fn maybe_run_shell_command(
req: &ShellRequest,
attempt: &SandboxAttempt<'_>,
ctx: &ToolCtx,
command: &[String],
) -> Result<Option<ExecToolCallOutput>, ToolError> {
unix_escalation::try_run_zsh_fork(req, attempt, ctx, command).await
}
pub(super) async fn maybe_prepare_unified_exec(
req: &UnifiedExecRequest,
attempt: &SandboxAttempt<'_>,
ctx: &ToolCtx,
exec_request: ExecRequest,
) -> Result<Option<PreparedUnifiedExecSpawn>, ToolError> {
let Some(prepared) =
unix_escalation::prepare_unified_exec_zsh_fork(req, attempt, ctx, exec_request).await?
else {
return Ok(None);
};
Ok(Some(PreparedUnifiedExecSpawn {
exec_request: prepared.exec_request,
spawn_lifecycle: Box::new(ZshForkSpawnLifecycle {
escalation_session: prepared.escalation_session,
}),
}))
}
}
#[cfg(not(unix))]
mod imp {
use super::*;
pub(super) async fn maybe_run_shell_command(
req: &ShellRequest,
attempt: &SandboxAttempt<'_>,
ctx: &ToolCtx,
command: &[String],
) -> Result<Option<ExecToolCallOutput>, ToolError> {
let _ = (req, attempt, ctx, command);
Ok(None)
}
pub(super) async fn maybe_prepare_unified_exec(
req: &UnifiedExecRequest,
attempt: &SandboxAttempt<'_>,
ctx: &ToolCtx,
exec_request: ExecRequest,
) -> Result<Option<PreparedUnifiedExecSpawn>, ToolError> {
let _ = (req, attempt, ctx, exec_request);
Ok(None)
}
}

View file

@ -16,6 +16,7 @@ use crate::tools::network_approval::NetworkApprovalMode;
use crate::tools::network_approval::NetworkApprovalSpec;
use crate::tools::runtimes::build_command_spec;
use crate::tools::runtimes::maybe_wrap_shell_lc_with_snapshot;
use crate::tools::runtimes::shell::zsh_fork_backend;
use crate::tools::sandboxing::Approvable;
use crate::tools::sandboxing::ApprovalCtx;
use crate::tools::sandboxing::ExecApprovalRequirement;
@ -28,6 +29,8 @@ use crate::tools::sandboxing::ToolError;
use crate::tools::sandboxing::ToolRuntime;
use crate::tools::sandboxing::sandbox_override_for_first_attempt;
use crate::tools::sandboxing::with_cached_approval;
use crate::tools::spec::UnifiedExecBackendConfig;
use crate::unified_exec::NoopSpawnLifecycle;
use crate::unified_exec::UnifiedExecError;
use crate::unified_exec::UnifiedExecProcess;
use crate::unified_exec::UnifiedExecProcessManager;
@ -63,11 +66,12 @@ pub struct UnifiedExecApprovalKey {
pub struct UnifiedExecRuntime<'a> {
manager: &'a UnifiedExecProcessManager,
backend: UnifiedExecBackendConfig,
}
impl<'a> UnifiedExecRuntime<'a> {
pub fn new(manager: &'a UnifiedExecProcessManager) -> Self {
Self { manager }
pub fn new(manager: &'a UnifiedExecProcessManager, backend: UnifiedExecBackendConfig) -> Self {
Self { manager, backend }
}
}
@ -184,6 +188,47 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
if let Some(network) = req.network.as_ref() {
network.apply_to_env(&mut env);
}
if self.backend == UnifiedExecBackendConfig::ZshFork {
let spec = build_command_spec(
&command,
&req.cwd,
&env,
ExecExpiration::DefaultTimeout,
req.sandbox_permissions,
req.additional_permissions.clone(),
req.justification.clone(),
)
.map_err(|_| ToolError::Rejected("missing command line for PTY".to_string()))?;
let exec_env = attempt
.env_for(spec, req.network.as_ref())
.map_err(|err| ToolError::Codex(err.into()))?;
match zsh_fork_backend::maybe_prepare_unified_exec(req, attempt, ctx, exec_env).await? {
Some(prepared) => {
return self
.manager
.open_session_with_exec_env(
&prepared.exec_request,
req.tty,
prepared.spawn_lifecycle,
)
.await
.map_err(|err| match err {
UnifiedExecError::SandboxDenied { output, .. } => {
ToolError::Codex(CodexErr::Sandbox(SandboxErr::Denied {
output: Box::new(output),
network_policy_decision: None,
}))
}
other => ToolError::Rejected(other.to_string()),
});
}
None => {
tracing::warn!(
"UnifiedExec ZshFork backend specified, but conditions for using it were not met, falling back to direct execution",
);
}
}
}
let spec = build_command_spec(
&command,
&req.cwd,
@ -198,7 +243,7 @@ impl<'a> ToolRuntime<UnifiedExecRequest, UnifiedExecProcess> for UnifiedExecRunt
.env_for(spec, req.network.as_ref())
.map_err(|err| ToolError::Codex(err.into()))?;
self.manager
.open_session_with_exec_env(&exec_env, req.tty)
.open_session_with_exec_env(&exec_env, req.tty, Box::new(NoopSpawnLifecycle))
.await
.map_err(|err| match err {
UnifiedExecError::SandboxDenied { output, .. } => {

View file

@ -44,10 +44,17 @@ pub enum ShellCommandBackendConfig {
ZshFork,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum UnifiedExecBackendConfig {
Direct,
ZshFork,
}
#[derive(Debug, Clone)]
pub(crate) struct ToolsConfig {
pub shell_type: ConfigShellToolType,
shell_command_backend: ShellCommandBackendConfig,
pub unified_exec_backend: UnifiedExecBackendConfig,
pub allow_login_shell: bool,
pub apply_patch_tool_type: Option<ApplyPatchToolType>,
pub web_search_mode: Option<WebSearchMode>,
@ -102,6 +109,12 @@ impl ToolsConfig {
} else {
ShellCommandBackendConfig::Classic
};
let unified_exec_backend =
if features.enabled(Feature::ShellTool) && features.enabled(Feature::ShellZshFork) {
UnifiedExecBackendConfig::ZshFork
} else {
UnifiedExecBackendConfig::Direct
};
let shell_type = if !features.enabled(Feature::ShellTool) {
ConfigShellToolType::Disabled
@ -140,6 +153,7 @@ impl ToolsConfig {
Self {
shell_type,
shell_command_backend,
unified_exec_backend,
allow_login_shell: true,
apply_patch_tool_type,
web_search_mode: *web_search_mode,
@ -2817,6 +2831,10 @@ mod tests {
tools_config.shell_command_backend,
ShellCommandBackendConfig::ZshFork
);
assert_eq!(
tools_config.unified_exec_backend,
UnifiedExecBackendConfig::ZshFork
);
}
#[test]

View file

@ -49,6 +49,10 @@ pub(crate) fn set_deterministic_process_ids_for_tests(enabled: bool) {
}
pub(crate) use errors::UnifiedExecError;
pub(crate) use process::NoopSpawnLifecycle;
#[cfg(unix)]
pub(crate) use process::SpawnLifecycle;
pub(crate) use process::SpawnLifecycleHandle;
pub(crate) use process::UnifiedExecProcess;
pub(crate) const MIN_YIELD_TIME_MS: u64 = 250;

View file

@ -24,6 +24,17 @@ use super::UNIFIED_EXEC_OUTPUT_MAX_TOKENS;
use super::UnifiedExecError;
use super::head_tail_buffer::HeadTailBuffer;
pub(crate) trait SpawnLifecycle: std::fmt::Debug + Send + Sync {
fn after_spawn(&mut self) {}
}
pub(crate) type SpawnLifecycleHandle = Box<dyn SpawnLifecycle>;
#[derive(Debug, Default)]
pub(crate) struct NoopSpawnLifecycle;
impl SpawnLifecycle for NoopSpawnLifecycle {}
pub(crate) type OutputBuffer = Arc<Mutex<HeadTailBuffer>>;
pub(crate) struct OutputHandles {
pub(crate) output_buffer: OutputBuffer,
@ -44,6 +55,7 @@ pub(crate) struct UnifiedExecProcess {
output_drained: Arc<Notify>,
output_task: JoinHandle<()>,
sandbox_type: SandboxType,
_spawn_lifecycle: SpawnLifecycleHandle,
}
impl UnifiedExecProcess {
@ -51,6 +63,7 @@ impl UnifiedExecProcess {
process_handle: ExecCommandSession,
initial_output_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
sandbox_type: SandboxType,
spawn_lifecycle: SpawnLifecycleHandle,
) -> Self {
let output_buffer = Arc::new(Mutex::new(HeadTailBuffer::default()));
let output_notify = Arc::new(Notify::new());
@ -92,6 +105,7 @@ impl UnifiedExecProcess {
output_drained,
output_task,
sandbox_type,
_spawn_lifecycle: spawn_lifecycle,
}
}
@ -196,13 +210,14 @@ impl UnifiedExecProcess {
pub(super) async fn from_spawned(
spawned: SpawnedPty,
sandbox_type: SandboxType,
spawn_lifecycle: SpawnLifecycleHandle,
) -> Result<Self, UnifiedExecError> {
let SpawnedPty {
session: process_handle,
output_rx,
mut exit_rx,
} = spawned;
let managed = Self::new(process_handle, output_rx, sandbox_type);
let managed = Self::new(process_handle, output_rx, sandbox_type, spawn_lifecycle);
let exit_ready = matches!(exit_rx.try_recv(), Ok(_) | Err(TryRecvError::Closed));

View file

@ -49,6 +49,7 @@ use crate::unified_exec::generate_chunk_id;
use crate::unified_exec::head_tail_buffer::HeadTailBuffer;
use crate::unified_exec::process::OutputBuffer;
use crate::unified_exec::process::OutputHandles;
use crate::unified_exec::process::SpawnLifecycleHandle;
use crate::unified_exec::process::UnifiedExecProcess;
use crate::unified_exec::resolve_max_tokens;
@ -528,6 +529,7 @@ impl UnifiedExecProcessManager {
&self,
env: &ExecRequest,
tty: bool,
mut spawn_lifecycle: SpawnLifecycleHandle,
) -> Result<UnifiedExecProcess, UnifiedExecError> {
let (program, args) = env
.command
@ -555,7 +557,8 @@ impl UnifiedExecProcessManager {
};
let spawned =
spawn_result.map_err(|err| UnifiedExecError::create_process(err.to_string()))?;
UnifiedExecProcess::from_spawned(spawned, env.sandbox).await
spawn_lifecycle.after_spawn();
UnifiedExecProcess::from_spawned(spawned, env.sandbox, spawn_lifecycle).await
}
pub(super) async fn open_session_with_sandbox(
@ -569,7 +572,8 @@ impl UnifiedExecProcessManager {
Some(context.session.conversation_id),
));
let mut orchestrator = ToolOrchestrator::new();
let mut runtime = UnifiedExecRuntime::new(self);
let mut runtime =
UnifiedExecRuntime::new(self, context.turn.tools_config.unified_exec_backend);
let exec_approval_requirement = context
.session
.services

View file

@ -14,6 +14,8 @@ pub use unix::EscalationPermissions;
#[cfg(unix)]
pub use unix::EscalationPolicy;
#[cfg(unix)]
pub use unix::EscalationSession;
#[cfg(unix)]
pub use unix::ExecParams;
#[cfg(unix)]
pub use unix::ExecResult;

View file

@ -3,11 +3,14 @@ use std::os::fd::AsRawFd;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use anyhow::Context as _;
use codex_utils_absolute_path::AbsolutePathBuf;
use socket2::Socket;
use tokio::process::Command;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::unix::escalate_protocol::ESCALATE_SOCKET_ENV_VAR;
@ -32,12 +35,20 @@ use crate::unix::socket::AsyncSocket;
#[async_trait::async_trait]
pub trait ShellCommandExecutor: Send + Sync {
/// Runs the requested shell command and returns the captured result.
///
/// `env_overlay` contains only the wrapper/socket variables exported by
/// `EscalationSession::env()`, not a complete child environment.
/// Implementations should merge it into whatever base environment they use
/// for the shell process. `after_spawn` should be invoked immediately after
/// the shell process has been spawned so the parent copy of the inherited
/// escalation socket can be closed.
async fn run(
&self,
command: Vec<String>,
cwd: PathBuf,
env: HashMap<String, String>,
env_overlay: HashMap<String, String>,
cancel_rx: CancellationToken,
after_spawn: Option<Box<dyn FnOnce() + Send>>,
) -> anyhow::Result<ExecResult>;
/// Prepares an escalated subcommand for execution on the server side.
@ -82,6 +93,38 @@ pub struct PreparedExec {
pub arg0: Option<String>,
}
#[derive(Debug)]
pub struct EscalationSession {
env: HashMap<String, String>,
task: JoinHandle<anyhow::Result<()>>,
client_socket: Arc<Mutex<Option<Socket>>>,
cancellation_token: CancellationToken,
}
impl EscalationSession {
/// Returns just the environment overlay needed by the execve wrapper.
///
/// Callers should merge this into their own child-process environment
/// rather than treating it as the full environment for the shell.
pub fn env(&self) -> &HashMap<String, String> {
&self.env
}
pub fn close_client_socket(&self) {
if let Ok(mut client_socket) = self.client_socket.lock() {
client_socket.take();
}
}
}
impl Drop for EscalationSession {
fn drop(&mut self) {
self.close_client_socket();
self.cancellation_token.cancel();
self.task.abort();
}
}
pub struct EscalateServer {
bash_path: PathBuf,
execve_wrapper: PathBuf,
@ -106,29 +149,9 @@ impl EscalateServer {
cancel_rx: CancellationToken,
command_executor: Arc<dyn ShellCommandExecutor>,
) -> anyhow::Result<ExecResult> {
let (escalate_server, escalate_client) = AsyncDatagramSocket::pair()?;
let client_socket = escalate_client.into_inner();
// Only the client endpoint should cross exec into the wrapper process.
client_socket.set_cloexec(false)?;
let escalate_task = tokio::spawn(escalate_task(
escalate_server,
Arc::clone(&self.policy),
Arc::clone(&command_executor),
));
let mut env = std::env::vars().collect::<HashMap<String, String>>();
env.insert(
ESCALATE_SOCKET_ENV_VAR.to_string(),
client_socket.as_raw_fd().to_string(),
);
env.insert(
EXEC_WRAPPER_ENV_VAR.to_string(),
self.execve_wrapper.to_string_lossy().to_string(),
);
env.insert(
LEGACY_BASH_EXEC_WRAPPER_ENV_VAR.to_string(),
self.execve_wrapper.to_string_lossy().to_string(),
);
let session = self.start_session(cancel_rx.clone(), Arc::clone(&command_executor))?;
let env_overlay = session.env().clone();
let client_socket = Arc::clone(&session.client_socket);
let command = vec![
self.bash_path.to_string_lossy().to_string(),
if params.login == Some(false) {
@ -140,20 +163,80 @@ impl EscalateServer {
];
let workdir = AbsolutePathBuf::try_from(params.workdir)?;
let result = command_executor
.run(command, workdir.to_path_buf(), env, cancel_rx)
.run(
command,
workdir.to_path_buf(),
env_overlay,
cancel_rx,
Some(Box::new(move || {
if let Ok(mut client_socket) = client_socket.lock() {
client_socket.take();
}
})),
)
.await?;
escalate_task.abort();
Ok(result)
}
/// Starts an escalation session and returns the environment overlay a shell
/// needs in order to route intercepted execs through this server.
///
/// This does not spawn the shell itself. Callers own process creation and
/// only use the returned environment plus the session lifetime handle.
pub fn start_session(
&self,
parent_cancellation_token: CancellationToken,
command_executor: Arc<dyn ShellCommandExecutor>,
) -> anyhow::Result<EscalationSession> {
let cancellation_token = CancellationToken::new();
let (escalate_server, escalate_client) = AsyncDatagramSocket::pair()?;
let client_socket = escalate_client.into_inner();
let client_socket_fd = client_socket.as_raw_fd();
// Only the client endpoint should cross exec into the wrapper process.
client_socket.set_cloexec(false)?;
let client_socket = Arc::new(Mutex::new(Some(client_socket)));
let task = tokio::spawn(escalate_task(
escalate_server,
Arc::clone(&self.policy),
Arc::clone(&command_executor),
parent_cancellation_token,
cancellation_token.clone(),
));
let mut env = HashMap::new();
env.insert(
ESCALATE_SOCKET_ENV_VAR.to_string(),
client_socket_fd.to_string(),
);
env.insert(
EXEC_WRAPPER_ENV_VAR.to_string(),
self.execve_wrapper.to_string_lossy().to_string(),
);
env.insert(
LEGACY_BASH_EXEC_WRAPPER_ENV_VAR.to_string(),
self.execve_wrapper.to_string_lossy().to_string(),
);
Ok(EscalationSession {
env,
task,
client_socket,
cancellation_token,
})
}
}
async fn escalate_task(
socket: AsyncDatagramSocket,
policy: Arc<dyn EscalationPolicy>,
command_executor: Arc<dyn ShellCommandExecutor>,
parent_cancellation_token: CancellationToken,
session_cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
loop {
let (_, mut fds) = socket.receive_with_fds().await?;
let (_, mut fds) = tokio::select! {
received = socket.receive_with_fds() => received?,
_ = parent_cancellation_token.cancelled() => return Ok(()),
_ = session_cancellation_token.cancelled() => return Ok(()),
};
if fds.len() != 1 {
tracing::error!("expected 1 fd in datagram handshake, got {}", fds.len());
continue;
@ -161,9 +244,17 @@ async fn escalate_task(
let stream_socket = AsyncSocket::from_fd(fds.remove(0))?;
let policy = Arc::clone(&policy);
let command_executor = Arc::clone(&command_executor);
let parent_cancellation_token = parent_cancellation_token.clone();
let session_cancellation_token = session_cancellation_token.clone();
tokio::spawn(async move {
if let Err(err) =
handle_escalate_session_with_policy(stream_socket, policy, command_executor).await
if let Err(err) = handle_escalate_session_with_policy(
stream_socket,
policy,
command_executor,
parent_cancellation_token,
session_cancellation_token,
)
.await
{
tracing::error!("escalate session failed: {err:?}");
}
@ -175,18 +266,27 @@ async fn handle_escalate_session_with_policy(
socket: AsyncSocket,
policy: Arc<dyn EscalationPolicy>,
command_executor: Arc<dyn ShellCommandExecutor>,
parent_cancellation_token: CancellationToken,
session_cancellation_token: CancellationToken,
) -> anyhow::Result<()> {
let EscalateRequest {
file,
argv,
workdir,
env,
} = socket.receive::<EscalateRequest>().await?;
} = tokio::select! {
request = socket.receive::<EscalateRequest>() => request?,
_ = parent_cancellation_token.cancelled() => return Ok(()),
_ = session_cancellation_token.cancelled() => return Ok(()),
};
let program = AbsolutePathBuf::resolve_path_against_base(file, workdir.as_path())?;
let decision = policy
.determine_action(&program, &argv, &workdir)
.await
.context("failed to determine escalation action")?;
let decision = tokio::select! {
decision = policy.determine_action(&program, &argv, &workdir) => {
decision.context("failed to determine escalation action")?
}
_ = parent_cancellation_token.cancelled() => return Ok(()),
_ = session_cancellation_token.cancelled() => return Ok(()),
};
tracing::debug!("decided {decision:?} for {program:?} {argv:?} {workdir:?}");
@ -204,10 +304,13 @@ async fn handle_escalate_session_with_policy(
action: EscalateAction::Escalate,
})
.await?;
let (msg, fds) = socket
.receive_with_fds::<SuperExecMessage>()
.await
.context("failed to receive SuperExecMessage")?;
let (msg, fds) = tokio::select! {
message = socket.receive_with_fds::<SuperExecMessage>() => {
message.context("failed to receive SuperExecMessage")?
}
_ = parent_cancellation_token.cancelled() => return Ok(()),
_ = session_cancellation_token.cancelled() => return Ok(()),
};
if fds.len() != msg.fds.len() {
return Err(anyhow::anyhow!(
"mismatched number of fds in SuperExecMessage: {} in the message, {} from the control message",
@ -231,9 +334,11 @@ async fn handle_escalate_session_with_policy(
cwd,
env,
arg0,
} = command_executor
.prepare_escalated_exec(&program, &argv, &workdir, env, execution)
.await?;
} = tokio::select! {
prepared = command_executor.prepare_escalated_exec(&program, &argv, &workdir, env, execution) => prepared?,
_ = parent_cancellation_token.cancelled() => return Ok(()),
_ = session_cancellation_token.cancelled() => return Ok(()),
};
let (program, args) = command
.split_first()
.ok_or_else(|| anyhow::anyhow!("prepared escalated command must not be empty"))?;
@ -245,7 +350,8 @@ async fn handle_escalate_session_with_policy(
.current_dir(&cwd)
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null());
.stderr(Stdio::null())
.kill_on_drop(true);
unsafe {
command.pre_exec(move || {
for (dst_fd, src_fd) in msg.fds.iter().zip(&fds) {
@ -255,7 +361,17 @@ async fn handle_escalate_session_with_policy(
});
}
let mut child = command.spawn()?;
let exit_status = child.wait().await?;
let exit_status = tokio::select! {
status = child.wait() => status?,
_ = parent_cancellation_token.cancelled() => {
let _ = child.start_kill();
child.wait().await?
}
_ = session_cancellation_token.cancelled() => {
let _ = child.start_kill();
child.wait().await?
}
};
socket
.send(SuperExecResult {
exit_code: exit_status.code().unwrap_or(127),
@ -282,7 +398,15 @@ mod tests {
use codex_utils_absolute_path::AbsolutePathBuf;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
use std::os::fd::FromRawFd;
use std::path::PathBuf;
use std::sync::LazyLock;
use tempfile::TempDir;
use tokio::time::Instant;
use tokio::time::sleep;
static ESCALATE_SERVER_TEST_LOCK: LazyLock<tokio::sync::Mutex<()>> =
LazyLock::new(|| tokio::sync::Mutex::new(()));
struct DeterministicEscalationPolicy {
decision: EscalationDecision,
@ -327,8 +451,9 @@ mod tests {
&self,
_command: Vec<String>,
_cwd: PathBuf,
_env: HashMap<String, String>,
_env_overlay: HashMap<String, String>,
_cancel_rx: CancellationToken,
_after_spawn: Option<Box<dyn FnOnce() + Send>>,
) -> anyhow::Result<ExecResult> {
unreachable!("run() is not used by handle_escalate_session_with_policy() tests")
}
@ -362,8 +487,9 @@ mod tests {
&self,
_command: Vec<String>,
_cwd: PathBuf,
_env: HashMap<String, String>,
_env_overlay: HashMap<String, String>,
_cancel_rx: CancellationToken,
_after_spawn: Option<Box<dyn FnOnce() + Send>>,
) -> anyhow::Result<ExecResult> {
unreachable!("run() is not used by handle_escalate_session_with_policy() tests")
}
@ -391,8 +517,160 @@ mod tests {
}
}
async fn wait_for_pid_file(pid_file: &std::path::Path) -> anyhow::Result<i32> {
let deadline = Instant::now() + Duration::from_secs(5);
loop {
if let Ok(contents) = std::fs::read_to_string(pid_file) {
return Ok(contents.trim().parse()?);
}
if Instant::now() >= deadline {
return Err(anyhow::anyhow!(
"timed out waiting for pid file {}",
pid_file.display()
));
}
sleep(Duration::from_millis(20)).await;
}
}
fn process_exists(pid: i32) -> bool {
let rc = unsafe { libc::kill(pid, 0) };
if rc == 0 {
return true;
}
std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
}
struct AfterSpawnAssertingShellCommandExecutor;
#[async_trait::async_trait]
impl ShellCommandExecutor for AfterSpawnAssertingShellCommandExecutor {
async fn run(
&self,
_command: Vec<String>,
_cwd: PathBuf,
env_overlay: HashMap<String, String>,
_cancel_rx: CancellationToken,
after_spawn: Option<Box<dyn FnOnce() + Send>>,
) -> anyhow::Result<ExecResult> {
let socket_fd = env_overlay
.get(ESCALATE_SOCKET_ENV_VAR)
.expect("session should export shell escalation socket")
.parse::<i32>()?;
assert_ne!(unsafe { libc::fcntl(socket_fd, libc::F_GETFD) }, -1);
after_spawn.expect("one-shot exec should install an after-spawn hook")();
assert_eq!(unsafe { libc::fcntl(socket_fd, libc::F_GETFD) }, -1);
Ok(ExecResult {
exit_code: 0,
stdout: String::new(),
stderr: String::new(),
output: String::new(),
duration: Duration::ZERO,
timed_out: false,
})
}
async fn prepare_escalated_exec(
&self,
_program: &AbsolutePathBuf,
_argv: &[String],
_workdir: &AbsolutePathBuf,
_env: HashMap<String, String>,
_execution: EscalationExecution,
) -> anyhow::Result<PreparedExec> {
unreachable!("prepare_escalated_exec() is not used by exec() tests")
}
}
async fn wait_for_process_exit(pid: i32) -> anyhow::Result<()> {
let deadline = Instant::now() + Duration::from_secs(5);
loop {
if !process_exists(pid) {
return Ok(());
}
if Instant::now() >= deadline {
return Err(anyhow::anyhow!("timed out waiting for pid {pid} to exit"));
}
sleep(Duration::from_millis(20)).await;
}
}
/// Verifies that `start_session()` returns only the wrapper/socket env
/// overlay and does not need to touch the configured shell or wrapper
/// executable paths.
///
/// The `/bin/bash` and `/tmp/codex-execve-wrapper` values here are
/// intentionally fake sentinels: this test asserts that the paths are
/// copied into the exported environment and that the socket fd stays valid
/// until `close_client_socket()` is called.
#[tokio::test]
async fn start_session_exposes_wrapper_env_overlay() -> anyhow::Result<()> {
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
let execve_wrapper = PathBuf::from("/tmp/codex-execve-wrapper");
let execve_wrapper_str = execve_wrapper.to_string_lossy().to_string();
let server = EscalateServer::new(
PathBuf::from("/bin/bash"),
execve_wrapper.clone(),
DeterministicEscalationPolicy {
decision: EscalationDecision::run(),
},
);
let session = server.start_session(
CancellationToken::new(),
Arc::new(ForwardingShellCommandExecutor),
)?;
let env = session.env();
assert_eq!(env.get(EXEC_WRAPPER_ENV_VAR), Some(&execve_wrapper_str));
assert_eq!(
env.get(LEGACY_BASH_EXEC_WRAPPER_ENV_VAR),
Some(&execve_wrapper_str)
);
let socket_fd = env
.get(ESCALATE_SOCKET_ENV_VAR)
.expect("session should export shell escalation socket");
let socket_fd = socket_fd.parse::<i32>()?;
assert!(socket_fd >= 0);
assert_ne!(unsafe { libc::fcntl(socket_fd, libc::F_GETFD) }, -1);
session.close_client_socket();
assert_eq!(unsafe { libc::fcntl(socket_fd, libc::F_GETFD) }, -1);
Ok(())
}
#[tokio::test]
async fn exec_closes_parent_socket_after_shell_spawn() -> anyhow::Result<()> {
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
let server = EscalateServer::new(
PathBuf::from("/bin/bash"),
PathBuf::from("/tmp/codex-execve-wrapper"),
DeterministicEscalationPolicy {
decision: EscalationDecision::run(),
},
);
let result = server
.exec(
ExecParams {
command: "true".to_string(),
workdir: AbsolutePathBuf::current_dir()?
.to_string_lossy()
.to_string(),
timeout_ms: None,
login: Some(false),
},
CancellationToken::new(),
Arc::new(AfterSpawnAssertingShellCommandExecutor),
)
.await?;
assert_eq!(0, result.exit_code);
Ok(())
}
#[tokio::test]
async fn handle_escalate_session_respects_run_in_sandbox_decision() -> anyhow::Result<()> {
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
let (server, client) = AsyncSocket::pair()?;
let server_task = tokio::spawn(handle_escalate_session_with_policy(
server,
@ -400,6 +678,8 @@ mod tests {
decision: EscalationDecision::run(),
}),
Arc::new(ForwardingShellCommandExecutor),
CancellationToken::new(),
CancellationToken::new(),
));
let mut env = HashMap::new();
@ -430,6 +710,7 @@ mod tests {
#[tokio::test]
async fn handle_escalate_session_resolves_relative_file_against_request_workdir()
-> anyhow::Result<()> {
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
let (server, client) = AsyncSocket::pair()?;
let tmp = tempfile::TempDir::new()?;
let workdir = tmp.path().join("workspace");
@ -443,6 +724,8 @@ mod tests {
expected_workdir: workdir.clone(),
}),
Arc::new(ForwardingShellCommandExecutor),
CancellationToken::new(),
CancellationToken::new(),
));
client
@ -466,6 +749,7 @@ mod tests {
#[tokio::test]
async fn handle_escalate_session_executes_escalated_command() -> anyhow::Result<()> {
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
let (server, client) = AsyncSocket::pair()?;
let server_task = tokio::spawn(handle_escalate_session_with_policy(
server,
@ -473,6 +757,8 @@ mod tests {
decision: EscalationDecision::escalate(EscalationExecution::Unsandboxed),
}),
Arc::new(ForwardingShellCommandExecutor),
CancellationToken::new(),
CancellationToken::new(),
));
client
@ -508,6 +794,7 @@ mod tests {
#[tokio::test]
async fn handle_escalate_session_passes_permissions_to_executor() -> anyhow::Result<()> {
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
let (server, client) = AsyncSocket::pair()?;
let server_task = tokio::spawn(handle_escalate_session_with_policy(
server,
@ -529,6 +816,8 @@ mod tests {
..Default::default()
}),
}),
CancellationToken::new(),
CancellationToken::new(),
));
client
@ -557,4 +846,95 @@ mod tests {
server_task.await?
}
#[tokio::test]
async fn dropping_session_aborts_intercept_workers_and_kills_spawned_child()
-> anyhow::Result<()> {
let _guard = ESCALATE_SERVER_TEST_LOCK.lock().await;
let tmp = TempDir::new()?;
let pid_file = tmp.path().join("escalated-child.pid");
let pid_file_display = pid_file.display().to_string();
assert!(
!pid_file_display.contains('\''),
"test temp path should not contain single quotes: {pid_file_display}"
);
let server = EscalateServer::new(
PathBuf::from("/bin/bash"),
PathBuf::from("/tmp/codex-execve-wrapper"),
DeterministicEscalationPolicy {
decision: EscalationDecision::escalate(EscalationExecution::Unsandboxed),
},
);
let session = server.start_session(
CancellationToken::new(),
Arc::new(ForwardingShellCommandExecutor),
)?;
let socket_fd = session
.env()
.get(ESCALATE_SOCKET_ENV_VAR)
.expect("session should export shell escalation socket")
.parse::<i32>()?;
let dup_socket_fd = unsafe { libc::dup(socket_fd) };
assert!(dup_socket_fd >= 0, "expected dup() to succeed");
let handshake_client = unsafe { AsyncDatagramSocket::from_raw_fd(dup_socket_fd) }?;
let (server_stream, client_stream) = AsyncSocket::pair()?;
// Keep one local reference to the server end alive until the worker has
// responded once. Without that guard, macOS can observe EOF on the
// client side before the transferred fd is fully servicing the stream.
let server_stream_guard = server_stream.into_inner();
let dup_server_stream_fd = unsafe { libc::dup(server_stream_guard.as_raw_fd()) };
assert!(
dup_server_stream_fd >= 0,
"expected dup() of server stream to succeed"
);
let server_stream_fd = unsafe { std::os::fd::OwnedFd::from_raw_fd(dup_server_stream_fd) };
handshake_client
.send_with_fds(&[0], &[server_stream_fd])
.await
.context("failed to send handshake datagram")?;
client_stream
.send(EscalateRequest {
file: PathBuf::from("/bin/sh"),
argv: vec![
"sh".to_string(),
"-c".to_string(),
format!("echo $$ > '{pid_file_display}' && exec /bin/sleep 100"),
],
workdir: AbsolutePathBuf::current_dir()?,
env: HashMap::new(),
})
.await
.context("failed to send EscalateRequest")?;
let response = client_stream
.receive::<EscalateResponse>()
.await
.context("failed to receive EscalateResponse")?;
assert_eq!(
EscalateResponse {
action: EscalateAction::Escalate,
},
response
);
drop(server_stream_guard);
client_stream
.send_with_fds(SuperExecMessage { fds: Vec::new() }, &[])
.await
.context("failed to send SuperExecMessage")?;
let pid = wait_for_pid_file(&pid_file).await?;
assert!(
process_exists(pid),
"expected spawned child pid {pid} to exist"
);
drop(session);
wait_for_process_exit(pid).await?;
Ok(())
}
}

View file

@ -66,6 +66,7 @@ pub use self::escalate_protocol::EscalateAction;
pub use self::escalate_protocol::EscalationDecision;
pub use self::escalate_protocol::EscalationExecution;
pub use self::escalate_server::EscalateServer;
pub use self::escalate_server::EscalationSession;
pub use self::escalate_server::ExecParams;
pub use self::escalate_server::ExecResult;
pub use self::escalate_server::PreparedExec;

View file

@ -9,7 +9,7 @@ use tokio_util::sync::CancellationToken;
#[derive(Clone, Debug)]
pub struct Stopwatch {
limit: Duration,
limit: Option<Duration>,
inner: Arc<Mutex<StopwatchState>>,
notify: Arc<Notify>,
}
@ -30,13 +30,27 @@ impl Stopwatch {
active_pauses: 0,
})),
notify: Arc::new(Notify::new()),
limit,
limit: Some(limit),
}
}
pub fn unlimited() -> Self {
Self {
inner: Arc::new(Mutex::new(StopwatchState {
elapsed: Duration::ZERO,
running_since: Some(Instant::now()),
active_pauses: 0,
})),
notify: Arc::new(Notify::new()),
limit: None,
}
}
pub fn cancellation_token(&self) -> CancellationToken {
let limit = self.limit;
let token = CancellationToken::new();
let Some(limit) = self.limit else {
return token;
};
let cancel = token.clone();
let inner = Arc::clone(&self.inner);
let notify = Arc::clone(&self.notify);
@ -208,4 +222,16 @@ mod tests {
// Now the stopwatch should resume and hit the limit shortly after.
token.cancelled().await;
}
#[tokio::test]
async fn unlimited_stopwatch_never_cancels() {
let stopwatch = Stopwatch::unlimited();
let token = stopwatch.cancellation_token();
assert!(
timeout(Duration::from_millis(30), token.cancelled())
.await
.is_err()
);
}
}