From c2ca51273f8a1cd8c3467fc8424456d5785f65ff Mon Sep 17 00:00:00 2001 From: jif-oai Date: Mon, 9 Feb 2026 22:14:33 +0000 Subject: [PATCH] feat: use a notify instead of grace to close ue process (#11219) --- codex-rs/core/src/unified_exec/process.rs | 22 +++++++++- .../core/src/unified_exec/process_manager.rs | 42 +++++++++++++++++-- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/codex-rs/core/src/unified_exec/process.rs b/codex-rs/core/src/unified_exec/process.rs index d52d1a6a9..69790dbad 100644 --- a/codex-rs/core/src/unified_exec/process.rs +++ b/codex-rs/core/src/unified_exec/process.rs @@ -1,6 +1,8 @@ #![allow(clippy::module_inception)] use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use tokio::sync::Mutex; use tokio::sync::Notify; use tokio::sync::mpsc; @@ -26,6 +28,8 @@ pub(crate) type OutputBuffer = Arc>; pub(crate) struct OutputHandles { pub(crate) output_buffer: OutputBuffer, pub(crate) output_notify: Arc, + pub(crate) output_closed: Arc, + pub(crate) output_closed_notify: Arc, pub(crate) cancellation_token: CancellationToken, } @@ -34,6 +38,8 @@ pub(crate) struct UnifiedExecProcess { process_handle: ExecCommandSession, output_buffer: OutputBuffer, output_notify: Arc, + output_closed: Arc, + output_closed_notify: Arc, cancellation_token: CancellationToken, output_drained: Arc, output_task: JoinHandle<()>, @@ -48,11 +54,15 @@ impl UnifiedExecProcess { ) -> Self { let output_buffer = Arc::new(Mutex::new(HeadTailBuffer::default())); let output_notify = Arc::new(Notify::new()); + let output_closed = Arc::new(AtomicBool::new(false)); + let output_closed_notify = Arc::new(Notify::new()); let cancellation_token = CancellationToken::new(); let output_drained = Arc::new(Notify::new()); let mut receiver = initial_output_rx; let buffer_clone = Arc::clone(&output_buffer); let notify_clone = Arc::clone(&output_notify); + let output_closed_clone = Arc::clone(&output_closed); + let output_closed_notify_clone = Arc::clone(&output_closed_notify); let output_task = tokio::spawn(async move { loop { match receiver.recv().await { @@ -63,7 +73,11 @@ impl UnifiedExecProcess { notify_clone.notify_waiters(); } Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + output_closed_clone.store(true, Ordering::Release); + output_closed_notify_clone.notify_waiters(); + break; + } }; } }); @@ -72,6 +86,8 @@ impl UnifiedExecProcess { process_handle, output_buffer, output_notify, + output_closed, + output_closed_notify, cancellation_token, output_drained, output_task, @@ -87,6 +103,8 @@ impl UnifiedExecProcess { OutputHandles { output_buffer: Arc::clone(&self.output_buffer), output_notify: Arc::clone(&self.output_notify), + output_closed: Arc::clone(&self.output_closed), + output_closed_notify: Arc::clone(&self.output_closed_notify), cancellation_token: self.cancellation_token.clone(), } } @@ -112,6 +130,8 @@ impl UnifiedExecProcess { } pub(super) fn terminate(&self) { + self.output_closed.store(true, Ordering::Release); + self.output_closed_notify.notify_waiters(); self.process_handle.terminate(); self.cancellation_token.cancel(); self.output_task.abort(); diff --git a/codex-rs/core/src/unified_exec/process_manager.rs b/codex-rs/core/src/unified_exec/process_manager.rs index a81b36c7a..cee73fae2 100644 --- a/codex-rs/core/src/unified_exec/process_manager.rs +++ b/codex-rs/core/src/unified_exec/process_manager.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; +use std::sync::atomic::AtomicBool; use tokio::sync::Notify; use tokio::sync::mpsc; use tokio::time::Duration; @@ -71,6 +72,8 @@ struct PreparedProcessHandles { writer_tx: mpsc::Sender>, output_buffer: OutputBuffer, output_notify: Arc, + output_closed: Arc, + output_closed_notify: Arc, cancellation_token: CancellationToken, command: Vec, process_id: String, @@ -161,12 +164,16 @@ impl UnifiedExecProcessManager { let OutputHandles { output_buffer, output_notify, + output_closed, + output_closed_notify, cancellation_token, } = process.output_handles(); let deadline = start + Duration::from_millis(yield_time_ms); let collected = Self::collect_output_until_deadline( &output_buffer, &output_notify, + &output_closed, + &output_closed_notify, &cancellation_token, deadline, ) @@ -248,6 +255,8 @@ impl UnifiedExecProcessManager { writer_tx, output_buffer, output_notify, + output_closed, + output_closed_notify, cancellation_token, command: session_command, process_id, @@ -279,6 +288,8 @@ impl UnifiedExecProcessManager { let collected = Self::collect_output_until_deadline( &output_buffer, &output_notify, + &output_closed, + &output_closed_notify, &cancellation_token, deadline, ) @@ -369,6 +380,8 @@ impl UnifiedExecProcessManager { let OutputHandles { output_buffer, output_notify, + output_closed, + output_closed_notify, cancellation_token, } = entry.process.output_handles(); @@ -376,6 +389,8 @@ impl UnifiedExecProcessManager { writer_tx: entry.process.writer_sender(), output_buffer, output_notify, + output_closed, + output_closed_notify, cancellation_token, command: entry.command.clone(), process_id: entry.process_id.clone(), @@ -532,13 +547,16 @@ impl UnifiedExecProcessManager { pub(super) async fn collect_output_until_deadline( output_buffer: &OutputBuffer, output_notify: &Arc, + output_closed: &Arc, + output_closed_notify: &Arc, cancellation_token: &CancellationToken, deadline: Instant, ) -> Vec { - const POST_EXIT_OUTPUT_GRACE: Duration = Duration::from_millis(50); + const POST_EXIT_CLOSE_WAIT_CAP: Duration = Duration::from_millis(50); let mut collected: Vec = Vec::with_capacity(4096); let mut exit_signal_received = cancellation_token.is_cancelled(); + let mut post_exit_deadline: Option = None; loop { let drained_chunks: Vec>; let mut wait_for_output = None; @@ -552,20 +570,36 @@ impl UnifiedExecProcessManager { if drained_chunks.is_empty() { exit_signal_received |= cancellation_token.is_cancelled(); + if exit_signal_received && output_closed.load(std::sync::atomic::Ordering::Acquire) + { + break; + } let remaining = deadline.saturating_duration_since(Instant::now()); if remaining == Duration::ZERO { break; } - let notified = wait_for_output.unwrap_or_else(|| output_notify.notified()); if exit_signal_received { - let grace = remaining.min(POST_EXIT_OUTPUT_GRACE); - if tokio::time::timeout(grace, notified).await.is_err() { + let now = Instant::now(); + let close_wait_deadline = *post_exit_deadline + .get_or_insert_with(|| now + remaining.min(POST_EXIT_CLOSE_WAIT_CAP)); + let close_wait_remaining = close_wait_deadline.saturating_duration_since(now); + if close_wait_remaining == Duration::ZERO { break; } + let notified = wait_for_output.unwrap_or_else(|| output_notify.notified()); + let closed = output_closed_notify.notified(); + tokio::pin!(notified); + tokio::pin!(closed); + tokio::select! { + _ = &mut notified => {} + _ = &mut closed => {} + _ = tokio::time::sleep(close_wait_remaining) => break, + } continue; } + let notified = wait_for_output.unwrap_or_else(|| output_notify.notified()); tokio::pin!(notified); let exit_notified = cancellation_token.cancelled(); tokio::pin!(exit_notified);