Avoid hang when tool's process spawns grandchild that shares stderr/stdout (#6575)
We've received many reports of codex hanging when calling certain tools. [Here](https://github.com/openai/codex/issues/3204) is one example. This is likely a major cause. The problem occurs when `consume_truncated_output` waits for `stdout` and `stderr` to be closed once the child process terminates. This normally works fine, but it doesn't handle the case where the child has spawned grandchild processes that inherits `stdout` and `stderr`. The fix was originally written by @md-oai in [this PR](https://github.com/openai/codex/pull/1852), which has gone stale. I've copied the original fix (which looks sound to me) and added an integration test to prevent future regressions.
This commit is contained in:
parent
ad7eaa80f9
commit
73ed30d7e5
2 changed files with 146 additions and 2 deletions
|
|
@ -532,8 +532,52 @@ async fn consume_truncated_output(
|
|||
}
|
||||
};
|
||||
|
||||
let stdout = stdout_handle.await??;
|
||||
let stderr = stderr_handle.await??;
|
||||
// Wait for the stdout/stderr collection tasks but guard against them
|
||||
// hanging forever. In the normal case, both pipes are closed once the child
|
||||
// terminates so the tasks exit quickly. However, if the child process
|
||||
// spawned grandchildren that inherited its stdout/stderr file descriptors
|
||||
// those pipes may stay open after we `kill` the direct child on timeout.
|
||||
// That would cause the `read_capped` tasks to block on `read()`
|
||||
// indefinitely, effectively hanging the whole agent.
|
||||
|
||||
const IO_DRAIN_TIMEOUT_MS: u64 = 2_000; // 2 s should be plenty for local pipes
|
||||
|
||||
// We need mutable bindings so we can `abort()` them on timeout.
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
async fn await_with_timeout(
|
||||
handle: &mut JoinHandle<std::io::Result<StreamOutput<Vec<u8>>>>,
|
||||
timeout: Duration,
|
||||
) -> std::io::Result<StreamOutput<Vec<u8>>> {
|
||||
match tokio::time::timeout(timeout, &mut *handle).await {
|
||||
Ok(join_res) => match join_res {
|
||||
Ok(io_res) => io_res,
|
||||
Err(join_err) => Err(std::io::Error::other(join_err)),
|
||||
},
|
||||
Err(_elapsed) => {
|
||||
// Timeout: abort the task to avoid hanging on open pipes.
|
||||
handle.abort();
|
||||
Ok(StreamOutput {
|
||||
text: Vec::new(),
|
||||
truncated_after_lines: None,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut stdout_handle = stdout_handle;
|
||||
let mut stderr_handle = stderr_handle;
|
||||
|
||||
let stdout = await_with_timeout(
|
||||
&mut stdout_handle,
|
||||
Duration::from_millis(IO_DRAIN_TIMEOUT_MS),
|
||||
)
|
||||
.await?;
|
||||
let stderr = await_with_timeout(
|
||||
&mut stderr_handle,
|
||||
Duration::from_millis(IO_DRAIN_TIMEOUT_MS),
|
||||
)
|
||||
.await?;
|
||||
|
||||
drop(agg_tx);
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,10 @@
|
|||
#![cfg(not(target_os = "windows"))]
|
||||
#![allow(clippy::unwrap_used, clippy::expect_used)]
|
||||
|
||||
use std::fs;
|
||||
use std::time::Duration;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::Context;
|
||||
use anyhow::Result;
|
||||
use codex_core::features::Feature;
|
||||
|
|
@ -458,6 +462,102 @@ async fn shell_timeout_includes_timeout_prefix_and_metadata() -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn shell_timeout_handles_background_grandchild_stdout() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let server = start_mock_server().await;
|
||||
let mut builder = test_codex().with_config(|config| {
|
||||
config.model = "gpt-5".to_string();
|
||||
config.model_family = find_family_for_model("gpt-5").expect("gpt-5 is a valid model");
|
||||
config.sandbox_policy = SandboxPolicy::DangerFullAccess;
|
||||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let call_id = "shell-grandchild-timeout";
|
||||
let pid_path = test.cwd.path().join("grandchild_pid.txt");
|
||||
let script_path = test.cwd.path().join("spawn_detached.py");
|
||||
let script = format!(
|
||||
r#"import subprocess
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
# Spawn a detached grandchild that inherits stdout/stderr so the pipe stays open.
|
||||
proc = subprocess.Popen(["/bin/sh", "-c", "sleep 60"], start_new_session=True)
|
||||
Path({pid_path:?}).write_text(str(proc.pid))
|
||||
time.sleep(60)
|
||||
"#
|
||||
);
|
||||
fs::write(&script_path, script)?;
|
||||
|
||||
let args = json!({
|
||||
"command": ["python3", script_path.to_string_lossy()],
|
||||
"timeout_ms": 200,
|
||||
});
|
||||
|
||||
mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_function_call(call_id, "shell", &serde_json::to_string(&args)?),
|
||||
ev_completed("resp-1"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
let second_mock = mount_sse_once(
|
||||
&server,
|
||||
sse(vec![
|
||||
ev_assistant_message("msg-1", "done"),
|
||||
ev_completed("resp-2"),
|
||||
]),
|
||||
)
|
||||
.await;
|
||||
|
||||
let start = Instant::now();
|
||||
let output_str = tokio::time::timeout(Duration::from_secs(10), async {
|
||||
submit_turn(
|
||||
&test,
|
||||
"run a command with a detached grandchild",
|
||||
AskForApproval::Never,
|
||||
SandboxPolicy::DangerFullAccess,
|
||||
)
|
||||
.await?;
|
||||
let timeout_item = second_mock.single_request().function_call_output(call_id);
|
||||
timeout_item
|
||||
.get("output")
|
||||
.and_then(Value::as_str)
|
||||
.map(str::to_string)
|
||||
.context("timeout output string")
|
||||
})
|
||||
.await
|
||||
.context("exec call should not hang waiting for grandchild pipes to close")??;
|
||||
let elapsed = start.elapsed();
|
||||
|
||||
if let Ok(output_json) = serde_json::from_str::<Value>(&output_str) {
|
||||
assert_eq!(
|
||||
output_json["metadata"]["exit_code"].as_i64(),
|
||||
Some(124),
|
||||
"expected timeout exit code 124",
|
||||
);
|
||||
} else {
|
||||
let timeout_pattern = r"(?is)command timed out|timeout";
|
||||
assert_regex_match(timeout_pattern, &output_str);
|
||||
}
|
||||
|
||||
assert!(
|
||||
elapsed < Duration::from_secs(9),
|
||||
"command should return shortly after timeout even with live grandchildren: {elapsed:?}"
|
||||
);
|
||||
|
||||
if let Ok(pid_str) = fs::read_to_string(&pid_path)
|
||||
&& let Ok(pid) = pid_str.trim().parse::<libc::pid_t>()
|
||||
{
|
||||
unsafe { libc::kill(pid, libc::SIGKILL) };
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn shell_spawn_failure_truncates_exec_error() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue