feat: use a notify instead of grace to close ue process (#11219)

This commit is contained in:
jif-oai 2026-02-09 22:14:33 +00:00 committed by GitHub
parent cca13fb03a
commit c2ca51273f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 59 additions and 5 deletions

View file

@ -1,6 +1,8 @@
#![allow(clippy::module_inception)]
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use tokio::sync::Mutex;
use tokio::sync::Notify;
use tokio::sync::mpsc;
@ -26,6 +28,8 @@ pub(crate) type OutputBuffer = Arc<Mutex<HeadTailBuffer>>;
pub(crate) struct OutputHandles {
pub(crate) output_buffer: OutputBuffer,
pub(crate) output_notify: Arc<Notify>,
pub(crate) output_closed: Arc<AtomicBool>,
pub(crate) output_closed_notify: Arc<Notify>,
pub(crate) cancellation_token: CancellationToken,
}
@ -34,6 +38,8 @@ pub(crate) struct UnifiedExecProcess {
process_handle: ExecCommandSession,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
output_closed: Arc<AtomicBool>,
output_closed_notify: Arc<Notify>,
cancellation_token: CancellationToken,
output_drained: Arc<Notify>,
output_task: JoinHandle<()>,
@ -48,11 +54,15 @@ impl UnifiedExecProcess {
) -> Self {
let output_buffer = Arc::new(Mutex::new(HeadTailBuffer::default()));
let output_notify = Arc::new(Notify::new());
let output_closed = Arc::new(AtomicBool::new(false));
let output_closed_notify = Arc::new(Notify::new());
let cancellation_token = CancellationToken::new();
let output_drained = Arc::new(Notify::new());
let mut receiver = initial_output_rx;
let buffer_clone = Arc::clone(&output_buffer);
let notify_clone = Arc::clone(&output_notify);
let output_closed_clone = Arc::clone(&output_closed);
let output_closed_notify_clone = Arc::clone(&output_closed_notify);
let output_task = tokio::spawn(async move {
loop {
match receiver.recv().await {
@ -63,7 +73,11 @@ impl UnifiedExecProcess {
notify_clone.notify_waiters();
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue,
Err(tokio::sync::broadcast::error::RecvError::Closed) => break,
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
output_closed_clone.store(true, Ordering::Release);
output_closed_notify_clone.notify_waiters();
break;
}
};
}
});
@ -72,6 +86,8 @@ impl UnifiedExecProcess {
process_handle,
output_buffer,
output_notify,
output_closed,
output_closed_notify,
cancellation_token,
output_drained,
output_task,
@ -87,6 +103,8 @@ impl UnifiedExecProcess {
OutputHandles {
output_buffer: Arc::clone(&self.output_buffer),
output_notify: Arc::clone(&self.output_notify),
output_closed: Arc::clone(&self.output_closed),
output_closed_notify: Arc::clone(&self.output_closed_notify),
cancellation_token: self.cancellation_token.clone(),
}
}
@ -112,6 +130,8 @@ impl UnifiedExecProcess {
}
pub(super) fn terminate(&self) {
self.output_closed.store(true, Ordering::Release);
self.output_closed_notify.notify_waiters();
self.process_handle.terminate();
self.cancellation_token.cancel();
self.output_task.abort();

View file

@ -4,6 +4,7 @@ use std::collections::HashMap;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use tokio::sync::Notify;
use tokio::sync::mpsc;
use tokio::time::Duration;
@ -71,6 +72,8 @@ struct PreparedProcessHandles {
writer_tx: mpsc::Sender<Vec<u8>>,
output_buffer: OutputBuffer,
output_notify: Arc<Notify>,
output_closed: Arc<AtomicBool>,
output_closed_notify: Arc<Notify>,
cancellation_token: CancellationToken,
command: Vec<String>,
process_id: String,
@ -161,12 +164,16 @@ impl UnifiedExecProcessManager {
let OutputHandles {
output_buffer,
output_notify,
output_closed,
output_closed_notify,
cancellation_token,
} = process.output_handles();
let deadline = start + Duration::from_millis(yield_time_ms);
let collected = Self::collect_output_until_deadline(
&output_buffer,
&output_notify,
&output_closed,
&output_closed_notify,
&cancellation_token,
deadline,
)
@ -248,6 +255,8 @@ impl UnifiedExecProcessManager {
writer_tx,
output_buffer,
output_notify,
output_closed,
output_closed_notify,
cancellation_token,
command: session_command,
process_id,
@ -279,6 +288,8 @@ impl UnifiedExecProcessManager {
let collected = Self::collect_output_until_deadline(
&output_buffer,
&output_notify,
&output_closed,
&output_closed_notify,
&cancellation_token,
deadline,
)
@ -369,6 +380,8 @@ impl UnifiedExecProcessManager {
let OutputHandles {
output_buffer,
output_notify,
output_closed,
output_closed_notify,
cancellation_token,
} = entry.process.output_handles();
@ -376,6 +389,8 @@ impl UnifiedExecProcessManager {
writer_tx: entry.process.writer_sender(),
output_buffer,
output_notify,
output_closed,
output_closed_notify,
cancellation_token,
command: entry.command.clone(),
process_id: entry.process_id.clone(),
@ -532,13 +547,16 @@ impl UnifiedExecProcessManager {
pub(super) async fn collect_output_until_deadline(
output_buffer: &OutputBuffer,
output_notify: &Arc<Notify>,
output_closed: &Arc<AtomicBool>,
output_closed_notify: &Arc<Notify>,
cancellation_token: &CancellationToken,
deadline: Instant,
) -> Vec<u8> {
const POST_EXIT_OUTPUT_GRACE: Duration = Duration::from_millis(50);
const POST_EXIT_CLOSE_WAIT_CAP: Duration = Duration::from_millis(50);
let mut collected: Vec<u8> = Vec::with_capacity(4096);
let mut exit_signal_received = cancellation_token.is_cancelled();
let mut post_exit_deadline: Option<Instant> = None;
loop {
let drained_chunks: Vec<Vec<u8>>;
let mut wait_for_output = None;
@ -552,20 +570,36 @@ impl UnifiedExecProcessManager {
if drained_chunks.is_empty() {
exit_signal_received |= cancellation_token.is_cancelled();
if exit_signal_received && output_closed.load(std::sync::atomic::Ordering::Acquire)
{
break;
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining == Duration::ZERO {
break;
}
let notified = wait_for_output.unwrap_or_else(|| output_notify.notified());
if exit_signal_received {
let grace = remaining.min(POST_EXIT_OUTPUT_GRACE);
if tokio::time::timeout(grace, notified).await.is_err() {
let now = Instant::now();
let close_wait_deadline = *post_exit_deadline
.get_or_insert_with(|| now + remaining.min(POST_EXIT_CLOSE_WAIT_CAP));
let close_wait_remaining = close_wait_deadline.saturating_duration_since(now);
if close_wait_remaining == Duration::ZERO {
break;
}
let notified = wait_for_output.unwrap_or_else(|| output_notify.notified());
let closed = output_closed_notify.notified();
tokio::pin!(notified);
tokio::pin!(closed);
tokio::select! {
_ = &mut notified => {}
_ = &mut closed => {}
_ = tokio::time::sleep(close_wait_remaining) => break,
}
continue;
}
let notified = wait_for_output.unwrap_or_else(|| output_notify.notified());
tokio::pin!(notified);
let exit_notified = cancellation_token.cancelled();
tokio::pin!(exit_notified);