diff --git a/codex-rs/core/src/exec.rs b/codex-rs/core/src/exec.rs index 012fde953..765086ef1 100644 --- a/codex-rs/core/src/exec.rs +++ b/codex-rs/core/src/exec.rs @@ -532,8 +532,52 @@ async fn consume_truncated_output( } }; - let stdout = stdout_handle.await??; - let stderr = stderr_handle.await??; + // Wait for the stdout/stderr collection tasks but guard against them + // hanging forever. In the normal case, both pipes are closed once the child + // terminates so the tasks exit quickly. However, if the child process + // spawned grandchildren that inherited its stdout/stderr file descriptors + // those pipes may stay open after we `kill` the direct child on timeout. + // That would cause the `read_capped` tasks to block on `read()` + // indefinitely, effectively hanging the whole agent. + + const IO_DRAIN_TIMEOUT_MS: u64 = 2_000; // 2 s should be plenty for local pipes + + // We need mutable bindings so we can `abort()` them on timeout. + use tokio::task::JoinHandle; + + async fn await_with_timeout( + handle: &mut JoinHandle>>>, + timeout: Duration, + ) -> std::io::Result>> { + match tokio::time::timeout(timeout, &mut *handle).await { + Ok(join_res) => match join_res { + Ok(io_res) => io_res, + Err(join_err) => Err(std::io::Error::other(join_err)), + }, + Err(_elapsed) => { + // Timeout: abort the task to avoid hanging on open pipes. + handle.abort(); + Ok(StreamOutput { + text: Vec::new(), + truncated_after_lines: None, + }) + } + } + } + + let mut stdout_handle = stdout_handle; + let mut stderr_handle = stderr_handle; + + let stdout = await_with_timeout( + &mut stdout_handle, + Duration::from_millis(IO_DRAIN_TIMEOUT_MS), + ) + .await?; + let stderr = await_with_timeout( + &mut stderr_handle, + Duration::from_millis(IO_DRAIN_TIMEOUT_MS), + ) + .await?; drop(agg_tx); diff --git a/codex-rs/core/tests/suite/tools.rs b/codex-rs/core/tests/suite/tools.rs index a60257b6b..2e92e6596 100644 --- a/codex-rs/core/tests/suite/tools.rs +++ b/codex-rs/core/tests/suite/tools.rs @@ -1,6 +1,10 @@ #![cfg(not(target_os = "windows"))] #![allow(clippy::unwrap_used, clippy::expect_used)] +use std::fs; +use std::time::Duration; +use std::time::Instant; + use anyhow::Context; use anyhow::Result; use codex_core::features::Feature; @@ -458,6 +462,102 @@ async fn shell_timeout_includes_timeout_prefix_and_metadata() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn shell_timeout_handles_background_grandchild_stdout() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = start_mock_server().await; + let mut builder = test_codex().with_config(|config| { + config.model = "gpt-5".to_string(); + config.model_family = find_family_for_model("gpt-5").expect("gpt-5 is a valid model"); + config.sandbox_policy = SandboxPolicy::DangerFullAccess; + }); + let test = builder.build(&server).await?; + + let call_id = "shell-grandchild-timeout"; + let pid_path = test.cwd.path().join("grandchild_pid.txt"); + let script_path = test.cwd.path().join("spawn_detached.py"); + let script = format!( + r#"import subprocess +import time +from pathlib import Path + +# Spawn a detached grandchild that inherits stdout/stderr so the pipe stays open. +proc = subprocess.Popen(["/bin/sh", "-c", "sleep 60"], start_new_session=True) +Path({pid_path:?}).write_text(str(proc.pid)) +time.sleep(60) +"# + ); + fs::write(&script_path, script)?; + + let args = json!({ + "command": ["python3", script_path.to_string_lossy()], + "timeout_ms": 200, + }); + + mount_sse_once( + &server, + sse(vec![ + ev_response_created("resp-1"), + ev_function_call(call_id, "shell", &serde_json::to_string(&args)?), + ev_completed("resp-1"), + ]), + ) + .await; + let second_mock = mount_sse_once( + &server, + sse(vec![ + ev_assistant_message("msg-1", "done"), + ev_completed("resp-2"), + ]), + ) + .await; + + let start = Instant::now(); + let output_str = tokio::time::timeout(Duration::from_secs(10), async { + submit_turn( + &test, + "run a command with a detached grandchild", + AskForApproval::Never, + SandboxPolicy::DangerFullAccess, + ) + .await?; + let timeout_item = second_mock.single_request().function_call_output(call_id); + timeout_item + .get("output") + .and_then(Value::as_str) + .map(str::to_string) + .context("timeout output string") + }) + .await + .context("exec call should not hang waiting for grandchild pipes to close")??; + let elapsed = start.elapsed(); + + if let Ok(output_json) = serde_json::from_str::(&output_str) { + assert_eq!( + output_json["metadata"]["exit_code"].as_i64(), + Some(124), + "expected timeout exit code 124", + ); + } else { + let timeout_pattern = r"(?is)command timed out|timeout"; + assert_regex_match(timeout_pattern, &output_str); + } + + assert!( + elapsed < Duration::from_secs(9), + "command should return shortly after timeout even with live grandchildren: {elapsed:?}" + ); + + if let Ok(pid_str) = fs::read_to_string(&pid_path) + && let Ok(pid) = pid_str.trim().parse::() + { + unsafe { libc::kill(pid, libc::SIGKILL) }; + } + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn shell_spawn_failure_truncates_exec_error() -> Result<()> { skip_if_no_network!(Ok(()));