From 09aa71adb7a642408f05fe51db82854142e00945 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Thu, 12 Mar 2026 09:52:50 -0700 Subject: [PATCH] Fix stdio-to-uds peer-close flake (#13882) ## What changed - `codex-stdio-to-uds` now tolerates `NotConnected` when `shutdown(Write)` happens after the peer has already closed. - The socket test was rewritten to send stdin from a fixture file and to read an exact request payload length instead of waiting on EOF timing. ## Why this fixes the flake - This one exposed a real cross-platform runtime edge case: on macOS, the peer can close first after a successful exchange, and `shutdown(Write)` can report `NotConnected` even though the interaction already succeeded. - Treating that specific ordering as a harmless shutdown condition removes the production-level false failure. - The old test compounded the problem by depending on EOF timing, which varies by platform and scheduler. Exact-length IO makes the test deterministic and focused on the actual data exchange. ## Scope - Production logic change with matching test rewrite. --- codex-rs/Cargo.lock | 3 +- codex-rs/stdio-to-uds/Cargo.toml | 1 - codex-rs/stdio-to-uds/src/lib.rs | 10 +- codex-rs/stdio-to-uds/tests/stdio_to_uds.rs | 101 +++++++++++++++++--- 4 files changed, 98 insertions(+), 17 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 097fef2af..79bfc73ad 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1439,7 +1439,6 @@ dependencies = [ "codex-utils-cargo-bin", "codex-utils-cli", "codex-utils-json-to-toml", - "codex-utils-pty", "core_test_support", "futures", "opentelemetry", @@ -2439,6 +2438,7 @@ dependencies = [ "anyhow", "chrono", "clap", + "codex-otel", "codex-protocol", "dirs", "log", @@ -2458,7 +2458,6 @@ name = "codex-stdio-to-uds" version = "0.0.0" dependencies = [ "anyhow", - "assert_cmd", "codex-utils-cargo-bin", "pretty_assertions", "tempfile", diff --git a/codex-rs/stdio-to-uds/Cargo.toml b/codex-rs/stdio-to-uds/Cargo.toml index 20e3ade72..bc62ee8d2 100644 --- a/codex-rs/stdio-to-uds/Cargo.toml +++ b/codex-rs/stdio-to-uds/Cargo.toml @@ -22,7 +22,6 @@ anyhow = { workspace = true } uds_windows = { workspace = true } [dev-dependencies] -assert_cmd = { workspace = true } codex-utils-cargo-bin = { workspace = true } pretty_assertions = { workspace = true } tempfile = { workspace = true } diff --git a/codex-rs/stdio-to-uds/src/lib.rs b/codex-rs/stdio-to-uds/src/lib.rs index 119068884..9d2cb6c0f 100644 --- a/codex-rs/stdio-to-uds/src/lib.rs +++ b/codex-rs/stdio-to-uds/src/lib.rs @@ -39,9 +39,13 @@ pub fn run(socket_path: &Path) -> anyhow::Result<()> { io::copy(&mut handle, &mut stream).context("failed to copy data from stdin to socket")?; } - stream - .shutdown(Shutdown::Write) - .context("failed to shutdown socket writer")?; + // The peer can close immediately after sending its response; in that race, + // half-closing our write side can report NotConnected on some platforms. + if let Err(err) = stream.shutdown(Shutdown::Write) + && err.kind() != io::ErrorKind::NotConnected + { + return Err(err).context("failed to shutdown socket writer"); + } let stdout_result = stdout_thread .join() diff --git a/codex-rs/stdio-to-uds/tests/stdio_to_uds.rs b/codex-rs/stdio-to-uds/tests/stdio_to_uds.rs index c6062d50d..af8fd5922 100644 --- a/codex-rs/stdio-to-uds/tests/stdio_to_uds.rs +++ b/codex-rs/stdio-to-uds/tests/stdio_to_uds.rs @@ -1,12 +1,15 @@ use std::io::ErrorKind; use std::io::Read; use std::io::Write; +use std::process::Command; +use std::process::Stdio; use std::sync::mpsc; use std::thread; use std::time::Duration; +use std::time::Instant; use anyhow::Context; -use assert_cmd::Command; +use anyhow::anyhow; use pretty_assertions::assert_eq; #[cfg(unix)] @@ -17,8 +20,18 @@ use uds_windows::UnixListener; #[test] fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> { + // This test intentionally avoids `read_to_end()` on the server side because + // waiting for EOF can race with socket half-close behavior on slower runners. + // Reading the exact request length keeps the test deterministic. + // + // We also use `std::process::Command` (instead of `assert_cmd`) so we can + // poll/kill on timeout and include incremental server events + stderr in + // failure output, which makes flaky failures actionable to debug. let dir = tempfile::TempDir::new().context("failed to create temp dir")?; let socket_path = dir.path().join("socket"); + let request = b"request"; + let request_path = dir.path().join("request.txt"); + std::fs::write(&request_path, request).context("failed to write child stdin fixture")?; let listener = match UnixListener::bind(&socket_path) { Ok(listener) => listener, Err(err) if err.kind() == ErrorKind::PermissionDenied => { @@ -31,37 +44,103 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> { }; let (tx, rx) = mpsc::channel(); + let (event_tx, event_rx) = mpsc::channel(); let server_thread = thread::spawn(move || -> anyhow::Result<()> { + let _ = event_tx.send("waiting for accept".to_string()); let (mut connection, _) = listener .accept() .context("failed to accept test connection")?; - let mut received = Vec::new(); + let _ = event_tx.send("accepted connection".to_string()); + let mut received = vec![0; request.len()]; connection - .read_to_end(&mut received) + .read_exact(&mut received) .context("failed to read data from client")?; + let _ = event_tx.send(format!("read {} bytes", received.len())); tx.send(received) - .map_err(|_| anyhow::anyhow!("failed to send received bytes to test thread"))?; + .map_err(|_| anyhow!("failed to send received bytes to test thread"))?; connection .write_all(b"response") .context("failed to write response to client")?; + let _ = event_tx.send("wrote response".to_string()); Ok(()) }); - Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?) + let stdin = std::fs::File::open(&request_path).context("failed to open child stdin fixture")?; + let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?) .arg(&socket_path) - .write_stdin("request") - .assert() - .success() - .stdout("response"); + .stdin(Stdio::from(stdin)) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .context("failed to spawn codex-stdio-to-uds")?; + + let mut child_stdout = child.stdout.take().context("missing child stdout")?; + let mut child_stderr = child.stderr.take().context("missing child stderr")?; + let (stdout_tx, stdout_rx) = mpsc::channel(); + let (stderr_tx, stderr_rx) = mpsc::channel(); + thread::spawn(move || { + let mut stdout = Vec::new(); + let result = child_stdout.read_to_end(&mut stdout).map(|_| stdout); + let _ = stdout_tx.send(result); + }); + thread::spawn(move || { + let mut stderr = Vec::new(); + let result = child_stderr.read_to_end(&mut stderr).map(|_| stderr); + let _ = stderr_tx.send(result); + }); + + let mut server_events = Vec::new(); + let deadline = Instant::now() + Duration::from_secs(5); + let status = loop { + while let Ok(event) = event_rx.try_recv() { + server_events.push(event); + } + + if let Some(status) = child.try_wait().context("failed to poll child status")? { + break status; + } + + if Instant::now() >= deadline { + let _ = child.kill(); + let _ = child.wait(); + let stderr = stderr_rx + .recv_timeout(Duration::from_secs(1)) + .context("timed out waiting for child stderr after kill")? + .context("failed to read child stderr")?; + anyhow::bail!( + "codex-stdio-to-uds did not exit in time; server events: {:?}; stderr: {}", + server_events, + String::from_utf8_lossy(&stderr).trim_end() + ); + } + + thread::sleep(Duration::from_millis(25)); + }; + + let stdout = stdout_rx + .recv_timeout(Duration::from_secs(1)) + .context("timed out waiting for child stdout")? + .context("failed to read child stdout")?; + let stderr = stderr_rx + .recv_timeout(Duration::from_secs(1)) + .context("timed out waiting for child stderr")? + .context("failed to read child stderr")?; + assert!( + status.success(), + "codex-stdio-to-uds exited with {status}; server events: {:?}; stderr: {}", + server_events, + String::from_utf8_lossy(&stderr).trim_end() + ); + assert_eq!(stdout, b"response"); let received = rx .recv_timeout(Duration::from_secs(1)) .context("server did not receive data in time")?; - assert_eq!(received, b"request"); + assert_eq!(received, request); let server_result = server_thread .join() - .map_err(|_| anyhow::anyhow!("server thread panicked"))?; + .map_err(|_| anyhow!("server thread panicked"))?; server_result.context("server failed")?; Ok(())