Fix stdio-to-uds peer-close flake (#13882)

## What changed
- `codex-stdio-to-uds` now tolerates `NotConnected` when
`shutdown(Write)` happens after the peer has already closed.
- The socket test was rewritten to send stdin from a fixture file and to
read an exact request payload length instead of waiting on EOF timing.

## Why this fixes the flake
- This one exposed a real cross-platform runtime edge case: on macOS,
the peer can close first after a successful exchange, and
`shutdown(Write)` can report `NotConnected` even though the interaction
already succeeded.
- Treating that specific ordering as a harmless shutdown condition
removes the production-level false failure.
- The old test compounded the problem by depending on EOF timing, which
varies by platform and scheduler. Exact-length IO makes the test
deterministic and focused on the actual data exchange.

## Scope
- Production logic change with matching test rewrite.
This commit is contained in:
Ahmed Ibrahim 2026-03-12 09:52:50 -07:00 committed by GitHub
parent a30b807efe
commit 09aa71adb7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 98 additions and 17 deletions

3
codex-rs/Cargo.lock generated
View file

@ -1439,7 +1439,6 @@ dependencies = [
"codex-utils-cargo-bin",
"codex-utils-cli",
"codex-utils-json-to-toml",
"codex-utils-pty",
"core_test_support",
"futures",
"opentelemetry",
@ -2439,6 +2438,7 @@ dependencies = [
"anyhow",
"chrono",
"clap",
"codex-otel",
"codex-protocol",
"dirs",
"log",
@ -2458,7 +2458,6 @@ name = "codex-stdio-to-uds"
version = "0.0.0"
dependencies = [
"anyhow",
"assert_cmd",
"codex-utils-cargo-bin",
"pretty_assertions",
"tempfile",

View file

@ -22,7 +22,6 @@ anyhow = { workspace = true }
uds_windows = { workspace = true }
[dev-dependencies]
assert_cmd = { workspace = true }
codex-utils-cargo-bin = { workspace = true }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }

View file

@ -39,9 +39,13 @@ pub fn run(socket_path: &Path) -> anyhow::Result<()> {
io::copy(&mut handle, &mut stream).context("failed to copy data from stdin to socket")?;
}
stream
.shutdown(Shutdown::Write)
.context("failed to shutdown socket writer")?;
// The peer can close immediately after sending its response; in that race,
// half-closing our write side can report NotConnected on some platforms.
if let Err(err) = stream.shutdown(Shutdown::Write)
&& err.kind() != io::ErrorKind::NotConnected
{
return Err(err).context("failed to shutdown socket writer");
}
let stdout_result = stdout_thread
.join()

View file

@ -1,12 +1,15 @@
use std::io::ErrorKind;
use std::io::Read;
use std::io::Write;
use std::process::Command;
use std::process::Stdio;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::time::Instant;
use anyhow::Context;
use assert_cmd::Command;
use anyhow::anyhow;
use pretty_assertions::assert_eq;
#[cfg(unix)]
@ -17,8 +20,18 @@ use uds_windows::UnixListener;
#[test]
fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
// This test intentionally avoids `read_to_end()` on the server side because
// waiting for EOF can race with socket half-close behavior on slower runners.
// Reading the exact request length keeps the test deterministic.
//
// We also use `std::process::Command` (instead of `assert_cmd`) so we can
// poll/kill on timeout and include incremental server events + stderr in
// failure output, which makes flaky failures actionable to debug.
let dir = tempfile::TempDir::new().context("failed to create temp dir")?;
let socket_path = dir.path().join("socket");
let request = b"request";
let request_path = dir.path().join("request.txt");
std::fs::write(&request_path, request).context("failed to write child stdin fixture")?;
let listener = match UnixListener::bind(&socket_path) {
Ok(listener) => listener,
Err(err) if err.kind() == ErrorKind::PermissionDenied => {
@ -31,37 +44,103 @@ fn pipes_stdin_and_stdout_through_socket() -> anyhow::Result<()> {
};
let (tx, rx) = mpsc::channel();
let (event_tx, event_rx) = mpsc::channel();
let server_thread = thread::spawn(move || -> anyhow::Result<()> {
let _ = event_tx.send("waiting for accept".to_string());
let (mut connection, _) = listener
.accept()
.context("failed to accept test connection")?;
let mut received = Vec::new();
let _ = event_tx.send("accepted connection".to_string());
let mut received = vec![0; request.len()];
connection
.read_to_end(&mut received)
.read_exact(&mut received)
.context("failed to read data from client")?;
let _ = event_tx.send(format!("read {} bytes", received.len()));
tx.send(received)
.map_err(|_| anyhow::anyhow!("failed to send received bytes to test thread"))?;
.map_err(|_| anyhow!("failed to send received bytes to test thread"))?;
connection
.write_all(b"response")
.context("failed to write response to client")?;
let _ = event_tx.send("wrote response".to_string());
Ok(())
});
Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
let stdin = std::fs::File::open(&request_path).context("failed to open child stdin fixture")?;
let mut child = Command::new(codex_utils_cargo_bin::cargo_bin("codex-stdio-to-uds")?)
.arg(&socket_path)
.write_stdin("request")
.assert()
.success()
.stdout("response");
.stdin(Stdio::from(stdin))
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("failed to spawn codex-stdio-to-uds")?;
let mut child_stdout = child.stdout.take().context("missing child stdout")?;
let mut child_stderr = child.stderr.take().context("missing child stderr")?;
let (stdout_tx, stdout_rx) = mpsc::channel();
let (stderr_tx, stderr_rx) = mpsc::channel();
thread::spawn(move || {
let mut stdout = Vec::new();
let result = child_stdout.read_to_end(&mut stdout).map(|_| stdout);
let _ = stdout_tx.send(result);
});
thread::spawn(move || {
let mut stderr = Vec::new();
let result = child_stderr.read_to_end(&mut stderr).map(|_| stderr);
let _ = stderr_tx.send(result);
});
let mut server_events = Vec::new();
let deadline = Instant::now() + Duration::from_secs(5);
let status = loop {
while let Ok(event) = event_rx.try_recv() {
server_events.push(event);
}
if let Some(status) = child.try_wait().context("failed to poll child status")? {
break status;
}
if Instant::now() >= deadline {
let _ = child.kill();
let _ = child.wait();
let stderr = stderr_rx
.recv_timeout(Duration::from_secs(1))
.context("timed out waiting for child stderr after kill")?
.context("failed to read child stderr")?;
anyhow::bail!(
"codex-stdio-to-uds did not exit in time; server events: {:?}; stderr: {}",
server_events,
String::from_utf8_lossy(&stderr).trim_end()
);
}
thread::sleep(Duration::from_millis(25));
};
let stdout = stdout_rx
.recv_timeout(Duration::from_secs(1))
.context("timed out waiting for child stdout")?
.context("failed to read child stdout")?;
let stderr = stderr_rx
.recv_timeout(Duration::from_secs(1))
.context("timed out waiting for child stderr")?
.context("failed to read child stderr")?;
assert!(
status.success(),
"codex-stdio-to-uds exited with {status}; server events: {:?}; stderr: {}",
server_events,
String::from_utf8_lossy(&stderr).trim_end()
);
assert_eq!(stdout, b"response");
let received = rx
.recv_timeout(Duration::from_secs(1))
.context("server did not receive data in time")?;
assert_eq!(received, b"request");
assert_eq!(received, request);
let server_result = server_thread
.join()
.map_err(|_| anyhow::anyhow!("server thread panicked"))?;
.map_err(|_| anyhow!("server thread panicked"))?;
server_result.context("server failed")?;
Ok(())