core-agent-ide/codex-rs/utils/pty/src/pipe.rs
Michael Bolin b77fe8fefe
Apply argument comment lint across codex-rs (#14652)
## Why

Once the repo-local lint exists, `codex-rs` needs to follow the
checked-in convention and CI needs to keep it from drifting. This commit
applies the fallback `/*param*/` style consistently across existing
positional literal call sites without changing those APIs.

The longer-term preference is still to avoid APIs that require comments
by choosing clearer parameter types and call shapes. This PR is
intentionally the mechanical follow-through for the places where the
existing signatures stay in place.

After rebasing onto newer `main`, the rollout also had to cover newly
introduced `tui_app_server` call sites. That made it clear the first cut
of the CI job was too expensive for the common path: it was spending
almost as much time installing `cargo-dylint` and re-testing the lint
crate as a representative test job spends running product tests. The CI
update keeps the full workspace enforcement but trims that extra
overhead from ordinary `codex-rs` PRs.

## What changed

- keep a dedicated `argument_comment_lint` job in `rust-ci`
- mechanically annotate remaining opaque positional literals across
`codex-rs` with exact `/*param*/` comments, including the rebased
`tui_app_server` call sites that now fall under the lint
- keep the checked-in style aligned with the lint policy by using
`/*param*/` and leaving string and char literals uncommented
- cache `cargo-dylint`, `dylint-link`, and the relevant Cargo
registry/git metadata in the lint job
- split changed-path detection so the lint crate's own `cargo test` step
runs only when `tools/argument-comment-lint/*` or `rust-ci.yml` changes
- continue to run the repo wrapper over the `codex-rs` workspace, so
product-code enforcement is unchanged

Most of the code changes in this commit are intentionally mechanical
comment rewrites or insertions driven by the lint itself.

## Verification

- `./tools/argument-comment-lint/run.sh --workspace`
- `cargo test -p codex-tui-app-server -p codex-tui`
- parsed `.github/workflows/rust-ci.yml` locally with PyYAML

---

* -> #14652
* #14651
2026-03-16 16:48:15 -07:00

294 lines
7.9 KiB
Rust

use std::collections::HashMap;
use std::io;
use std::io::ErrorKind;
use std::path::Path;
use std::process::Stdio;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use anyhow::Result;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
use tokio::io::BufReader;
use tokio::process::Command;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio::task::JoinHandle;
use crate::process::ChildTerminator;
use crate::process::ProcessHandle;
use crate::process::SpawnedProcess;
#[cfg(target_os = "linux")]
use libc;
struct PipeChildTerminator {
#[cfg(windows)]
pid: u32,
#[cfg(unix)]
process_group_id: u32,
}
impl ChildTerminator for PipeChildTerminator {
fn kill(&mut self) -> io::Result<()> {
#[cfg(unix)]
{
crate::process_group::kill_process_group(self.process_group_id)
}
#[cfg(windows)]
{
kill_process(self.pid)
}
#[cfg(not(any(unix, windows)))]
{
Ok(())
}
}
}
#[cfg(windows)]
fn kill_process(pid: u32) -> io::Result<()> {
unsafe {
let handle = winapi::um::processthreadsapi::OpenProcess(
winapi::um::winnt::PROCESS_TERMINATE,
0,
pid,
);
if handle.is_null() {
return Err(io::Error::last_os_error());
}
let success = winapi::um::processthreadsapi::TerminateProcess(handle, 1);
let err = io::Error::last_os_error();
winapi::um::handleapi::CloseHandle(handle);
if success == 0 {
Err(err)
} else {
Ok(())
}
}
}
async fn read_output_stream<R>(mut reader: R, output_tx: mpsc::Sender<Vec<u8>>)
where
R: AsyncRead + Unpin,
{
let mut buf = vec![0u8; 8_192];
loop {
match reader.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
let _ = output_tx.send(buf[..n].to_vec()).await;
}
Err(ref e) if e.kind() == ErrorKind::Interrupted => continue,
Err(_) => break,
}
}
}
#[derive(Clone, Copy)]
enum PipeStdinMode {
Piped,
Null,
}
async fn spawn_process_with_stdin_mode(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
stdin_mode: PipeStdinMode,
inherited_fds: &[i32],
) -> Result<SpawnedProcess> {
if program.is_empty() {
anyhow::bail!("missing program for pipe spawn");
}
#[cfg(not(unix))]
let _ = inherited_fds;
let mut command = Command::new(program);
#[cfg(unix)]
if let Some(arg0) = arg0 {
command.arg0(arg0);
}
#[cfg(target_os = "linux")]
let parent_pid = unsafe { libc::getpid() };
#[cfg(unix)]
let inherited_fds = inherited_fds.to_vec();
#[cfg(unix)]
unsafe {
command.pre_exec(move || {
crate::process_group::detach_from_tty()?;
#[cfg(target_os = "linux")]
crate::process_group::set_parent_death_signal(parent_pid)?;
crate::pty::close_inherited_fds_except(&inherited_fds);
Ok(())
});
}
#[cfg(not(unix))]
let _ = arg0;
command.current_dir(cwd);
command.env_clear();
for (key, value) in env {
command.env(key, value);
}
for arg in args {
command.arg(arg);
}
match stdin_mode {
PipeStdinMode::Piped => {
command.stdin(Stdio::piped());
}
PipeStdinMode::Null => {
command.stdin(Stdio::null());
}
}
command.stdout(Stdio::piped());
command.stderr(Stdio::piped());
let mut child = command.spawn()?;
let pid = child
.id()
.ok_or_else(|| io::Error::other("missing child pid"))?;
#[cfg(unix)]
let process_group_id = pid;
let stdin = child.stdin.take();
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let (writer_tx, mut writer_rx) = mpsc::channel::<Vec<u8>>(128);
let (stdout_tx, stdout_rx) = mpsc::channel::<Vec<u8>>(128);
let (stderr_tx, stderr_rx) = mpsc::channel::<Vec<u8>>(128);
let writer_handle = if let Some(stdin) = stdin {
let writer = Arc::new(tokio::sync::Mutex::new(stdin));
tokio::spawn(async move {
while let Some(bytes) = writer_rx.recv().await {
let mut guard = writer.lock().await;
let _ = guard.write_all(&bytes).await;
let _ = guard.flush().await;
}
})
} else {
drop(writer_rx);
tokio::spawn(async {})
};
let stdout_handle = stdout.map(|stdout| {
let stdout_tx = stdout_tx.clone();
tokio::spawn(async move {
read_output_stream(BufReader::new(stdout), stdout_tx).await;
})
});
let stderr_handle = stderr.map(|stderr| {
let stderr_tx = stderr_tx.clone();
tokio::spawn(async move {
read_output_stream(BufReader::new(stderr), stderr_tx).await;
})
});
let mut reader_abort_handles = Vec::new();
if let Some(handle) = stdout_handle.as_ref() {
reader_abort_handles.push(handle.abort_handle());
}
if let Some(handle) = stderr_handle.as_ref() {
reader_abort_handles.push(handle.abort_handle());
}
let reader_handle = tokio::spawn(async move {
if let Some(handle) = stdout_handle {
let _ = handle.await;
}
if let Some(handle) = stderr_handle {
let _ = handle.await;
}
});
let (exit_tx, exit_rx) = oneshot::channel::<i32>();
let exit_status = Arc::new(AtomicBool::new(false));
let wait_exit_status = Arc::clone(&exit_status);
let exit_code = Arc::new(StdMutex::new(None));
let wait_exit_code = Arc::clone(&exit_code);
let wait_handle: JoinHandle<()> = tokio::spawn(async move {
let code = match child.wait().await {
Ok(status) => status.code().unwrap_or(-1),
Err(_) => -1,
};
wait_exit_status.store(true, std::sync::atomic::Ordering::SeqCst);
if let Ok(mut guard) = wait_exit_code.lock() {
*guard = Some(code);
}
let _ = exit_tx.send(code);
});
let handle = ProcessHandle::new(
writer_tx,
Box::new(PipeChildTerminator {
#[cfg(windows)]
pid,
#[cfg(unix)]
process_group_id,
}),
reader_handle,
reader_abort_handles,
writer_handle,
wait_handle,
exit_status,
exit_code,
/*pty_handles*/ None,
);
Ok(SpawnedProcess {
session: handle,
stdout_rx,
stderr_rx,
exit_rx,
})
}
/// Spawn a process using regular pipes (no PTY), returning handles for stdin, split output, and exit.
pub async fn spawn_process(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
spawn_process_with_stdin_mode(program, args, cwd, env, arg0, PipeStdinMode::Piped, &[]).await
}
/// Spawn a process using regular pipes, but close stdin immediately.
pub async fn spawn_process_no_stdin(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
) -> Result<SpawnedProcess> {
spawn_process_no_stdin_with_inherited_fds(program, args, cwd, env, arg0, &[]).await
}
/// Spawn a process using regular pipes, close stdin immediately, and preserve
/// selected inherited file descriptors across exec on Unix.
pub async fn spawn_process_no_stdin_with_inherited_fds(
program: &str,
args: &[String],
cwd: &Path,
env: &HashMap<String, String>,
arg0: &Option<String>,
inherited_fds: &[i32],
) -> Result<SpawnedProcess> {
spawn_process_with_stdin_mode(
program,
args,
cwd,
env,
arg0,
PipeStdinMode::Null,
inherited_fds,
)
.await
}