core-agent-ide/codex-rs/utils/pty/src/process.rs
Michael Bolin ef37d313c6
fix: preserve zsh-fork escalation fds across unified-exec spawn paths (#13644)
## Why

`zsh-fork` sessions launched through unified-exec need the escalation
socket to survive the wrapper -> server -> child handoff so later
intercepted `exec()` calls can still reach the escalation server.

The inherited-fd spawn path also needs to avoid closing Rust's internal
exec-error pipe, and the shell-escalation handoff needs to tolerate the
receive-side case where a transferred fd is installed into the same
stdio slot it will be mapped onto.

## What Changed

- Added `SpawnLifecycle::inherited_fds()` in
`codex-rs/core/src/unified_exec/process.rs` and threaded inherited fds
through `codex-rs/core/src/unified_exec/process_manager.rs` so
unified-exec can preserve required descriptors across both PTY and
no-stdin pipe spawn paths.
- Updated `codex-rs/core/src/tools/runtimes/shell/zsh_fork_backend.rs`
to expose the escalation socket fd through the spawn lifecycle.
- Added inherited-fd-aware spawn helpers in
`codex-rs/utils/pty/src/pty.rs` and `codex-rs/utils/pty/src/pipe.rs`,
including Unix pre-exec fd pruning that preserves requested inherited
fds while leaving `FD_CLOEXEC` descriptors alone. The pruning helper is
now named `close_inherited_fds_except()` to better describe that
behavior.
- Updated `codex-rs/shell-escalation/src/unix/escalate_client.rs` to
duplicate local stdio before transfer and send destination stdio numbers
in `SuperExecMessage`, so the wrapper keeps using its own
`stdin`/`stdout`/`stderr` until the escalated child takes over.
- Updated `codex-rs/shell-escalation/src/unix/escalate_server.rs` so the
server accepts the overlap case where a received fd reuses the same
stdio descriptor number that the child setup will target with `dup2`.
- Added comments around the PTY stdio wiring and the overlap regression
helper to make the fd handoff and controlling-terminal setup easier to
follow.

## Verification

- `cargo test -p codex-utils-pty`
- covers preserved-fd PTY spawn behavior, PTY resize, Python REPL
continuity, exec-failure reporting, and the no-stdin pipe path
- `cargo test -p codex-shell-escalation`
- covers duplicated-fd transfer on the client side and verifies the
overlap case by passing a pipe-backed stdin payload through the
server-side `dup2` path

---
[//]: # (BEGIN SAPLING FOOTER)
Stack created with [Sapling](https://sapling-scm.com). Best reviewed
with [ReviewStack](https://reviewstack.dev/openai/codex/pull/13644).
* #14624
* __->__ #13644
2026-03-13 20:25:31 +00:00

265 lines
7.7 KiB
Rust

use core::fmt;
use std::io;
#[cfg(unix)]
use std::os::fd::RawFd;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use anyhow::anyhow;
use portable_pty::MasterPty;
use portable_pty::PtySize;
use portable_pty::SlavePty;
use tokio::sync::broadcast;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::AbortHandle;
use tokio::task::JoinHandle;
pub(crate) trait ChildTerminator: Send + Sync {
fn kill(&mut self) -> io::Result<()>;
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct TerminalSize {
pub rows: u16,
pub cols: u16,
}
impl Default for TerminalSize {
fn default() -> Self {
Self { rows: 24, cols: 80 }
}
}
impl From<TerminalSize> for PtySize {
fn from(value: TerminalSize) -> Self {
Self {
rows: value.rows,
cols: value.cols,
pixel_width: 0,
pixel_height: 0,
}
}
}
#[cfg(unix)]
pub(crate) trait PtyHandleKeepAlive: Send {}
#[cfg(unix)]
impl<T: Send + ?Sized> PtyHandleKeepAlive for T {}
pub(crate) enum PtyMasterHandle {
Resizable(Box<dyn MasterPty + Send>),
#[cfg(unix)]
Opaque {
raw_fd: RawFd,
_handle: Box<dyn PtyHandleKeepAlive>,
},
}
pub struct PtyHandles {
pub _slave: Option<Box<dyn SlavePty + Send>>,
pub(crate) _master: PtyMasterHandle,
}
impl fmt::Debug for PtyHandles {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PtyHandles").finish()
}
}
/// Handle for driving an interactive process (PTY or pipe).
pub struct ProcessHandle {
writer_tx: StdMutex<Option<mpsc::Sender<Vec<u8>>>>,
killer: StdMutex<Option<Box<dyn ChildTerminator>>>,
reader_handle: StdMutex<Option<JoinHandle<()>>>,
reader_abort_handles: StdMutex<Vec<AbortHandle>>,
writer_handle: StdMutex<Option<JoinHandle<()>>>,
wait_handle: StdMutex<Option<JoinHandle<()>>>,
exit_status: Arc<AtomicBool>,
exit_code: Arc<StdMutex<Option<i32>>>,
// PtyHandles must be preserved because the process will receive Control+C if the
// slave is closed
_pty_handles: StdMutex<Option<PtyHandles>>,
}
impl fmt::Debug for ProcessHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ProcessHandle").finish()
}
}
impl ProcessHandle {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
writer_tx: mpsc::Sender<Vec<u8>>,
killer: Box<dyn ChildTerminator>,
reader_handle: JoinHandle<()>,
reader_abort_handles: Vec<AbortHandle>,
writer_handle: JoinHandle<()>,
wait_handle: JoinHandle<()>,
exit_status: Arc<AtomicBool>,
exit_code: Arc<StdMutex<Option<i32>>>,
pty_handles: Option<PtyHandles>,
) -> Self {
Self {
writer_tx: StdMutex::new(Some(writer_tx)),
killer: StdMutex::new(Some(killer)),
reader_handle: StdMutex::new(Some(reader_handle)),
reader_abort_handles: StdMutex::new(reader_abort_handles),
writer_handle: StdMutex::new(Some(writer_handle)),
wait_handle: StdMutex::new(Some(wait_handle)),
exit_status,
exit_code,
_pty_handles: StdMutex::new(pty_handles),
}
}
/// Returns a channel sender for writing raw bytes to the child stdin.
pub fn writer_sender(&self) -> mpsc::Sender<Vec<u8>> {
if let Ok(writer_tx) = self.writer_tx.lock() {
if let Some(writer_tx) = writer_tx.as_ref() {
return writer_tx.clone();
}
}
let (writer_tx, writer_rx) = mpsc::channel(1);
drop(writer_rx);
writer_tx
}
/// True if the child process has exited.
pub fn has_exited(&self) -> bool {
self.exit_status.load(std::sync::atomic::Ordering::SeqCst)
}
/// Returns the exit code if known.
pub fn exit_code(&self) -> Option<i32> {
self.exit_code.lock().ok().and_then(|guard| *guard)
}
/// Resize the PTY in character cells.
pub fn resize(&self, size: TerminalSize) -> anyhow::Result<()> {
let handles = self
._pty_handles
.lock()
.map_err(|_| anyhow!("failed to lock PTY handles"))?;
let handles = handles
.as_ref()
.ok_or_else(|| anyhow!("process is not attached to a PTY"))?;
match &handles._master {
PtyMasterHandle::Resizable(master) => master.resize(size.into()),
#[cfg(unix)]
PtyMasterHandle::Opaque { raw_fd, .. } => resize_raw_pty(*raw_fd, size),
}
}
/// Close the child's stdin channel.
pub fn close_stdin(&self) {
if let Ok(mut writer_tx) = self.writer_tx.lock() {
writer_tx.take();
}
}
/// Attempts to kill the child while leaving the reader/writer tasks alive
/// so callers can still drain output until EOF.
pub fn request_terminate(&self) {
if let Ok(mut killer_opt) = self.killer.lock() {
if let Some(mut killer) = killer_opt.take() {
let _ = killer.kill();
}
}
}
/// Attempts to kill the child and abort helper tasks.
pub fn terminate(&self) {
self.request_terminate();
if let Ok(mut h) = self.reader_handle.lock() {
if let Some(handle) = h.take() {
handle.abort();
}
}
if let Ok(mut handles) = self.reader_abort_handles.lock() {
for handle in handles.drain(..) {
handle.abort();
}
}
if let Ok(mut h) = self.writer_handle.lock() {
if let Some(handle) = h.take() {
handle.abort();
}
}
if let Ok(mut h) = self.wait_handle.lock() {
if let Some(handle) = h.take() {
handle.abort();
}
}
}
}
impl Drop for ProcessHandle {
fn drop(&mut self) {
self.terminate();
}
}
#[cfg(unix)]
fn resize_raw_pty(raw_fd: RawFd, size: TerminalSize) -> anyhow::Result<()> {
let mut winsize = libc::winsize {
ws_row: size.rows,
ws_col: size.cols,
ws_xpixel: 0,
ws_ypixel: 0,
};
let result = unsafe { libc::ioctl(raw_fd, libc::TIOCSWINSZ, &mut winsize) };
if result == -1 {
return Err(std::io::Error::last_os_error().into());
}
Ok(())
}
/// Combine split stdout/stderr receivers into a single broadcast receiver.
pub fn combine_output_receivers(
mut stdout_rx: mpsc::Receiver<Vec<u8>>,
mut stderr_rx: mpsc::Receiver<Vec<u8>>,
) -> broadcast::Receiver<Vec<u8>> {
let (combined_tx, combined_rx) = broadcast::channel(256);
tokio::spawn(async move {
let mut stdout_open = true;
let mut stderr_open = true;
loop {
tokio::select! {
stdout = stdout_rx.recv(), if stdout_open => match stdout {
Some(chunk) => {
let _ = combined_tx.send(chunk);
}
None => {
stdout_open = false;
}
},
stderr = stderr_rx.recv(), if stderr_open => match stderr {
Some(chunk) => {
let _ = combined_tx.send(chunk);
}
None => {
stderr_open = false;
}
},
else => break,
}
}
});
combined_rx
}
/// Return value from PTY or pipe spawn helpers.
#[derive(Debug)]
pub struct SpawnedProcess {
pub session: ProcessHandle,
pub stdout_rx: mpsc::Receiver<Vec<u8>>,
pub stderr_rx: mpsc::Receiver<Vec<u8>>,
pub exit_rx: oneshot::Receiver<i32>,
}