From 32e4a5d5d9ae1acad2e85a142c1b2d446306a4e5 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 16 Mar 2026 22:19:51 -0700 Subject: [PATCH] [stack 4/4] Reduce realtime self-interruptions during playback (#14827) ## Stack Position 4/4. Top-of-stack sibling built on #14830. ## Base - #14830 ## Sibling - #14829 ## Scope - Gate low-level mic chunks while speaker playback is active, while still allowing spoken barge-in. --------- Co-authored-by: Codex --- codex-rs/tui/src/chatwidget/realtime.rs | 50 ++++-- codex-rs/tui/src/lib.rs | 12 +- codex-rs/tui/src/voice.rs | 214 ++++++++++++++++++++---- 3 files changed, 227 insertions(+), 49 deletions(-) diff --git a/codex-rs/tui/src/chatwidget/realtime.rs b/codex-rs/tui/src/chatwidget/realtime.rs index 6b5042307..37646880e 100644 --- a/codex-rs/tui/src/chatwidget/realtime.rs +++ b/codex-rs/tui/src/chatwidget/realtime.rs @@ -5,6 +5,8 @@ use codex_protocol::protocol::RealtimeConversationClosedEvent; use codex_protocol::protocol::RealtimeConversationRealtimeEvent; use codex_protocol::protocol::RealtimeConversationStartedEvent; use codex_protocol::protocol::RealtimeEvent; +#[cfg(not(target_os = "linux"))] +use std::sync::atomic::AtomicUsize; const REALTIME_CONVERSATION_PROMPT: &str = "You are in a realtime voice conversation in the Codex TUI. Respond conversationally and concisely."; @@ -30,6 +32,10 @@ pub(super) struct RealtimeConversationUiState { capture: Option, #[cfg(not(target_os = "linux"))] audio_player: Option, + #[cfg(not(target_os = "linux"))] + // Shared queue depth lets capture suppress echoed speaker audio without + // taking the playback queue lock from the input callback. + playback_queued_samples: Arc, } impl RealtimeConversationUiState { @@ -192,16 +198,15 @@ impl ChatWidget { None } - fn realtime_footer_hint_items() -> Vec<(String, String)> { - vec![("/realtime".to_string(), "stop live voice".to_string())] - } - pub(super) fn start_realtime_conversation(&mut self) { self.realtime_conversation.phase = RealtimeConversationPhase::Starting; self.realtime_conversation.requested_close = false; self.realtime_conversation.session_id = None; self.realtime_conversation.warned_audio_only_submission = false; - self.set_footer_hint_override(Some(Self::realtime_footer_hint_items())); + self.set_footer_hint_override(Some(vec![( + "/realtime".to_string(), + "stop live voice".to_string(), + )])); self.submit_op(Op::RealtimeConversationStart(ConversationStartParams { prompt: REALTIME_CONVERSATION_PROMPT.to_string(), session_id: None, @@ -251,7 +256,10 @@ impl ChatWidget { self.realtime_conversation.phase = RealtimeConversationPhase::Active; self.realtime_conversation.session_id = ev.session_id; self.realtime_conversation.warned_audio_only_submission = false; - self.set_footer_hint_override(Some(Self::realtime_footer_hint_items())); + self.set_footer_hint_override(Some(vec![( + "/realtime".to_string(), + "stop live voice".to_string(), + )])); self.start_realtime_local_audio(); self.request_redraw(); } @@ -264,11 +272,17 @@ impl ChatWidget { RealtimeEvent::SessionUpdated { session_id, .. } => { self.realtime_conversation.session_id = Some(session_id); } - RealtimeEvent::InputAudioSpeechStarted(_) => {} + RealtimeEvent::InputAudioSpeechStarted(_) | RealtimeEvent::ResponseCancelled(_) => { + #[cfg(not(target_os = "linux"))] + if let Some(player) = &self.realtime_conversation.audio_player { + // Once the server detects user speech or the current response is cancelled, + // any buffered assistant audio is stale and should stop gating mic input. + player.clear(); + } + } RealtimeEvent::InputTranscriptDelta(_) => {} RealtimeEvent::OutputTranscriptDelta(_) => {} RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame), - RealtimeEvent::ResponseCancelled(_) => {} RealtimeEvent::ConversationItemAdded(_item) => {} RealtimeEvent::ConversationItemDone { .. } => {} RealtimeEvent::HandoffRequested(_) => {} @@ -296,8 +310,11 @@ impl ChatWidget { #[cfg(not(target_os = "linux"))] { if self.realtime_conversation.audio_player.is_none() { - self.realtime_conversation.audio_player = - crate::voice::RealtimeAudioPlayer::start(&self.config).ok(); + self.realtime_conversation.audio_player = crate::voice::RealtimeAudioPlayer::start( + &self.config, + Arc::clone(&self.realtime_conversation.playback_queued_samples), + ) + .ok(); } if let Some(player) = &self.realtime_conversation.audio_player && let Err(err) = player.enqueue_frame(frame) @@ -324,6 +341,7 @@ impl ChatWidget { let capture = match crate::voice::VoiceCapture::start_realtime( &self.config, self.app_event_tx.clone(), + Arc::clone(&self.realtime_conversation.playback_queued_samples), ) { Ok(capture) => capture, Err(err) => { @@ -342,8 +360,11 @@ impl ChatWidget { self.realtime_conversation.capture_stop_flag = Some(stop_flag.clone()); self.realtime_conversation.capture = Some(capture); if self.realtime_conversation.audio_player.is_none() { - self.realtime_conversation.audio_player = - crate::voice::RealtimeAudioPlayer::start(&self.config).ok(); + self.realtime_conversation.audio_player = crate::voice::RealtimeAudioPlayer::start( + &self.config, + Arc::clone(&self.realtime_conversation.playback_queued_samples), + ) + .ok(); } std::thread::spawn(move || { @@ -381,7 +402,10 @@ impl ChatWidget { } RealtimeAudioDeviceKind::Speaker => { self.stop_realtime_speaker(); - match crate::voice::RealtimeAudioPlayer::start(&self.config) { + match crate::voice::RealtimeAudioPlayer::start( + &self.config, + Arc::clone(&self.realtime_conversation.playback_queued_samples), + ) { Ok(player) => { self.realtime_conversation.audio_player = Some(player); } diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 3c1151773..c7f809cc6 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -140,6 +140,7 @@ mod voice { use std::sync::Mutex; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU16; + use std::sync::atomic::AtomicUsize; pub struct RecordedAudio { pub data: Vec, @@ -158,7 +159,11 @@ mod voice { Err("voice input is unavailable in this build".to_string()) } - pub fn start_realtime(_config: &Config, _tx: AppEventSender) -> Result { + pub fn start_realtime( + _config: &Config, + _tx: AppEventSender, + _playback_queued_samples: Arc, + ) -> Result { Err("voice input is unavailable in this build".to_string()) } @@ -198,7 +203,10 @@ mod voice { } impl RealtimeAudioPlayer { - pub(crate) fn start(_config: &Config) -> Result { + pub(crate) fn start( + _config: &Config, + _queued_samples: Arc, + ) -> Result { Err("voice output is unavailable in this build".to_string()) } diff --git a/codex-rs/tui/src/voice.rs b/codex-rs/tui/src/voice.rs index 07adcfd0a..ba260b028 100644 --- a/codex-rs/tui/src/voice.rs +++ b/codex-rs/tui/src/voice.rs @@ -1,5 +1,7 @@ use crate::app_event::AppEvent; use crate::app_event_sender::AppEventSender; +use crate::audio_device::preferred_input_config; +use crate::audio_device::select_configured_input_device_and_config; use base64::Engine; use codex_client::build_reqwest_client_with_custom_ca; use codex_core::auth::AuthCredentialsStoreMode; @@ -23,7 +25,10 @@ use std::sync::Arc; use std::sync::Mutex; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU16; +use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; +use std::time::Duration; +use std::time::Instant; use tracing::error; use tracing::info; use tracing::trace; @@ -31,6 +36,13 @@ use tracing::trace; const AUDIO_MODEL: &str = "gpt-4o-mini-transcribe"; const MODEL_AUDIO_SAMPLE_RATE: u32 = 24_000; const MODEL_AUDIO_CHANNELS: u16 = 1; +// While playback is buffered, ignore low-level mic input that is likely just +// speaker echo. A user who starts talking over playback should cross this peak +// threshold and reopen the gate. +const REALTIME_INTERRUPT_INPUT_PEAK_THRESHOLD: u16 = 4_000; +// After we decide an interruption is intentional, keep forwarding the next few +// callbacks so trailing syllables are not chopped up between chunks. +const REALTIME_INTERRUPT_GRACE_PERIOD: Duration = Duration::from_millis(900); struct TranscriptionAuthContext { mode: AuthMode, @@ -79,8 +91,12 @@ impl VoiceCapture { }) } - pub fn start_realtime(config: &Config, tx: AppEventSender) -> Result { - let (device, config) = select_realtime_input_device_and_config(config)?; + pub fn start_realtime( + config: &Config, + tx: AppEventSender, + playback_queued_samples: Arc, + ) -> Result { + let (device, config) = select_configured_input_device_and_config(config)?; let sample_rate = config.sample_rate().0; let channels = config.channels(); @@ -94,6 +110,7 @@ impl VoiceCapture { sample_rate, channels, tx, + playback_queued_samples, last_peak.clone(), )?; stream @@ -273,16 +290,10 @@ fn select_default_input_device_and_config() let device = host .default_input_device() .ok_or_else(|| "no input audio device available".to_string())?; - let config = crate::audio_device::preferred_input_config(&device)?; + let config = preferred_input_config(&device)?; Ok((device, config)) } -fn select_realtime_input_device_and_config( - config: &Config, -) -> Result<(cpal::Device, cpal::SupportedStreamConfig), String> { - crate::audio_device::select_configured_input_device_and_config(config) -} - fn build_input_stream( device: &cpal::Device, config: &cpal::SupportedStreamConfig, @@ -343,17 +354,33 @@ fn build_realtime_input_stream( sample_rate: u32, channels: u16, tx: AppEventSender, + playback_queued_samples: Arc, last_peak: Arc, ) -> Result { match config.sample_format() { cpal::SampleFormat::F32 => device .build_input_stream( &config.clone().into(), - move |input: &[f32], _| { - let peak = peak_f32(input); - last_peak.store(peak, Ordering::Relaxed); - let samples = input.iter().copied().map(f32_to_i16).collect::>(); - send_realtime_audio_chunk(&tx, samples, sample_rate, channels); + { + let playback_queued_samples = Arc::clone(&playback_queued_samples); + let last_peak = Arc::clone(&last_peak); + let tx = tx; + let mut allow_input_until = None; + move |input: &[f32], _| { + let peak = peak_f32(input); + if !should_send_realtime_input( + peak, + &playback_queued_samples, + &mut allow_input_until, + Instant::now(), + ) { + last_peak.store(0, Ordering::Relaxed); + return; + } + last_peak.store(peak, Ordering::Relaxed); + let samples = input.iter().copied().map(f32_to_i16).collect::>(); + send_realtime_audio_chunk(&tx, samples, sample_rate, channels); + } }, move |err| error!("audio input error: {err}"), None, @@ -362,10 +389,25 @@ fn build_realtime_input_stream( cpal::SampleFormat::I16 => device .build_input_stream( &config.clone().into(), - move |input: &[i16], _| { - let peak = peak_i16(input); - last_peak.store(peak, Ordering::Relaxed); - send_realtime_audio_chunk(&tx, input.to_vec(), sample_rate, channels); + { + let playback_queued_samples = Arc::clone(&playback_queued_samples); + let last_peak = Arc::clone(&last_peak); + let tx = tx; + let mut allow_input_until = None; + move |input: &[i16], _| { + let peak = peak_i16(input); + if !should_send_realtime_input( + peak, + &playback_queued_samples, + &mut allow_input_until, + Instant::now(), + ) { + last_peak.store(0, Ordering::Relaxed); + return; + } + last_peak.store(peak, Ordering::Relaxed); + send_realtime_audio_chunk(&tx, input.to_vec(), sample_rate, channels); + } }, move |err| error!("audio input error: {err}"), None, @@ -374,11 +416,26 @@ fn build_realtime_input_stream( cpal::SampleFormat::U16 => device .build_input_stream( &config.clone().into(), - move |input: &[u16], _| { - let mut samples = Vec::with_capacity(input.len()); - let peak = convert_u16_to_i16_and_peak(input, &mut samples); - last_peak.store(peak, Ordering::Relaxed); - send_realtime_audio_chunk(&tx, samples, sample_rate, channels); + { + let playback_queued_samples = Arc::clone(&playback_queued_samples); + let last_peak = Arc::clone(&last_peak); + let tx = tx; + let mut allow_input_until = None; + move |input: &[u16], _| { + let mut samples = Vec::with_capacity(input.len()); + let peak = convert_u16_to_i16_and_peak(input, &mut samples); + if !should_send_realtime_input( + peak, + &playback_queued_samples, + &mut allow_input_until, + Instant::now(), + ) { + last_peak.store(0, Ordering::Relaxed); + return; + } + last_peak.store(peak, Ordering::Relaxed); + send_realtime_audio_chunk(&tx, samples, sample_rate, channels); + } }, move |err| error!("audio input error: {err}"), None, @@ -487,24 +544,33 @@ fn convert_u16_to_i16_and_peak(input: &[u16], out: &mut Vec) -> u16 { pub(crate) struct RealtimeAudioPlayer { _stream: cpal::Stream, queue: Arc>>, + // Mirror the queue depth without locking so the input callback can cheaply + // tell whether playback is still draining. + queued_samples: Arc, output_sample_rate: u32, output_channels: u16, } impl RealtimeAudioPlayer { - pub(crate) fn start(config: &Config) -> Result { + pub(crate) fn start(config: &Config, queued_samples: Arc) -> Result { let (device, config) = crate::audio_device::select_configured_output_device_and_config(config)?; let output_sample_rate = config.sample_rate().0; let output_channels = config.channels(); let queue = Arc::new(Mutex::new(VecDeque::new())); - let stream = build_output_stream(&device, &config, Arc::clone(&queue))?; + let stream = build_output_stream( + &device, + &config, + Arc::clone(&queue), + Arc::clone(&queued_samples), + )?; stream .play() .map_err(|e| format!("failed to start output stream: {e}"))?; Ok(Self { _stream: stream, queue, + queued_samples, output_sample_rate, output_channels, }) @@ -539,12 +605,17 @@ impl RealtimeAudioPlayer { .lock() .map_err(|_| "failed to lock output audio queue".to_string())?; // TODO(aibrahim): Cap or trim this queue if we observe producer bursts outrunning playback. + // Keep the atomic in sync with the buffered PCM so capture can treat any + // queued output as active playback even before the audio callback drains it. + self.queued_samples + .fetch_add(converted.len(), Ordering::Relaxed); guard.extend(converted); Ok(()) } pub(crate) fn clear(&self) { if let Ok(mut guard) = self.queue.lock() { + self.queued_samples.store(0, Ordering::Relaxed); guard.clear(); } } @@ -554,13 +625,14 @@ fn build_output_stream( device: &cpal::Device, config: &cpal::SupportedStreamConfig, queue: Arc>>, + queued_samples: Arc, ) -> Result { let config_any: cpal::StreamConfig = config.clone().into(); match config.sample_format() { cpal::SampleFormat::F32 => device .build_output_stream( &config_any, - move |output: &mut [f32], _| fill_output_f32(output, &queue), + move |output: &mut [f32], _| fill_output_f32(output, &queue, &queued_samples), move |err| error!("audio output error: {err}"), None, ) @@ -568,7 +640,7 @@ fn build_output_stream( cpal::SampleFormat::I16 => device .build_output_stream( &config_any, - move |output: &mut [i16], _| fill_output_i16(output, &queue), + move |output: &mut [i16], _| fill_output_i16(output, &queue, &queued_samples), move |err| error!("audio output error: {err}"), None, ) @@ -576,7 +648,7 @@ fn build_output_stream( cpal::SampleFormat::U16 => device .build_output_stream( &config_any, - move |output: &mut [u16], _| fill_output_u16(output, &queue), + move |output: &mut [u16], _| fill_output_u16(output, &queue, &queued_samples), move |err| error!("audio output error: {err}"), None, ) @@ -585,38 +657,112 @@ fn build_output_stream( } } -fn fill_output_i16(output: &mut [i16], queue: &Arc>>) { +fn fill_output_i16( + output: &mut [i16], + queue: &Arc>>, + queued_samples: &Arc, +) { if let Ok(mut guard) = queue.lock() { + let mut consumed = 0usize; for sample in output { - *sample = guard.pop_front().unwrap_or(0); + *sample = if let Some(next) = guard.pop_front() { + consumed += 1; + next + } else { + 0 + }; + } + if consumed > 0 { + let _ = queued_samples.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |queued| { + Some(queued.saturating_sub(consumed)) + }); } return; } output.fill(0); } -fn fill_output_f32(output: &mut [f32], queue: &Arc>>) { +fn fill_output_f32( + output: &mut [f32], + queue: &Arc>>, + queued_samples: &Arc, +) { if let Ok(mut guard) = queue.lock() { + let mut consumed = 0usize; for sample in output { - let v = guard.pop_front().unwrap_or(0); + let v = if let Some(next) = guard.pop_front() { + consumed += 1; + next + } else { + 0 + }; *sample = (v as f32) / (i16::MAX as f32); } + if consumed > 0 { + let _ = queued_samples.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |queued| { + Some(queued.saturating_sub(consumed)) + }); + } return; } output.fill(0.0); } -fn fill_output_u16(output: &mut [u16], queue: &Arc>>) { +fn fill_output_u16( + output: &mut [u16], + queue: &Arc>>, + queued_samples: &Arc, +) { if let Ok(mut guard) = queue.lock() { + let mut consumed = 0usize; for sample in output { - let v = guard.pop_front().unwrap_or(0); + let v = if let Some(next) = guard.pop_front() { + consumed += 1; + next + } else { + 0 + }; *sample = (v as i32 + 32768).clamp(0, u16::MAX as i32) as u16; } + if consumed > 0 { + let _ = queued_samples.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |queued| { + Some(queued.saturating_sub(consumed)) + }); + } return; } output.fill(32768); } +/// Block quiet mic chunks while assistant playback is still buffered, but once a +/// real interruption is detected keep forwarding input briefly so the full +/// utterance reaches the server. +fn should_send_realtime_input( + peak: u16, + playback_queued_samples: &Arc, + allow_input_until: &mut Option, + now: Instant, +) -> bool { + if playback_queued_samples.load(Ordering::Relaxed) == 0 { + *allow_input_until = None; + return true; + } + + if let Some(deadline) = *allow_input_until { + if now < deadline { + return true; + } + *allow_input_until = None; + } + + if peak >= REALTIME_INTERRUPT_INPUT_PEAK_THRESHOLD { + *allow_input_until = Some(now + REALTIME_INTERRUPT_GRACE_PERIOD); + return true; + } + + false +} + fn convert_pcm16( input: &[i16], input_sample_rate: u32,