[stack 4/4] Reduce realtime self-interruptions during playback (#14827)

## Stack Position
4/4. Top-of-stack sibling built on #14830.

## Base
- #14830

## Sibling
- #14829

## Scope
- Gate low-level mic chunks while speaker playback is active, while
still allowing spoken barge-in.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim 2026-03-16 22:19:51 -07:00 committed by GitHub
parent 79f476e47d
commit 32e4a5d5d9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 227 additions and 49 deletions

View file

@ -5,6 +5,8 @@ use codex_protocol::protocol::RealtimeConversationClosedEvent;
use codex_protocol::protocol::RealtimeConversationRealtimeEvent;
use codex_protocol::protocol::RealtimeConversationStartedEvent;
use codex_protocol::protocol::RealtimeEvent;
#[cfg(not(target_os = "linux"))]
use std::sync::atomic::AtomicUsize;
const REALTIME_CONVERSATION_PROMPT: &str = "You are in a realtime voice conversation in the Codex TUI. Respond conversationally and concisely.";
@ -30,6 +32,10 @@ pub(super) struct RealtimeConversationUiState {
capture: Option<crate::voice::VoiceCapture>,
#[cfg(not(target_os = "linux"))]
audio_player: Option<crate::voice::RealtimeAudioPlayer>,
#[cfg(not(target_os = "linux"))]
// Shared queue depth lets capture suppress echoed speaker audio without
// taking the playback queue lock from the input callback.
playback_queued_samples: Arc<AtomicUsize>,
}
impl RealtimeConversationUiState {
@ -192,16 +198,15 @@ impl ChatWidget {
None
}
fn realtime_footer_hint_items() -> Vec<(String, String)> {
vec![("/realtime".to_string(), "stop live voice".to_string())]
}
pub(super) fn start_realtime_conversation(&mut self) {
self.realtime_conversation.phase = RealtimeConversationPhase::Starting;
self.realtime_conversation.requested_close = false;
self.realtime_conversation.session_id = None;
self.realtime_conversation.warned_audio_only_submission = false;
self.set_footer_hint_override(Some(Self::realtime_footer_hint_items()));
self.set_footer_hint_override(Some(vec![(
"/realtime".to_string(),
"stop live voice".to_string(),
)]));
self.submit_op(Op::RealtimeConversationStart(ConversationStartParams {
prompt: REALTIME_CONVERSATION_PROMPT.to_string(),
session_id: None,
@ -251,7 +256,10 @@ impl ChatWidget {
self.realtime_conversation.phase = RealtimeConversationPhase::Active;
self.realtime_conversation.session_id = ev.session_id;
self.realtime_conversation.warned_audio_only_submission = false;
self.set_footer_hint_override(Some(Self::realtime_footer_hint_items()));
self.set_footer_hint_override(Some(vec![(
"/realtime".to_string(),
"stop live voice".to_string(),
)]));
self.start_realtime_local_audio();
self.request_redraw();
}
@ -264,11 +272,17 @@ impl ChatWidget {
RealtimeEvent::SessionUpdated { session_id, .. } => {
self.realtime_conversation.session_id = Some(session_id);
}
RealtimeEvent::InputAudioSpeechStarted(_) => {}
RealtimeEvent::InputAudioSpeechStarted(_) | RealtimeEvent::ResponseCancelled(_) => {
#[cfg(not(target_os = "linux"))]
if let Some(player) = &self.realtime_conversation.audio_player {
// Once the server detects user speech or the current response is cancelled,
// any buffered assistant audio is stale and should stop gating mic input.
player.clear();
}
}
RealtimeEvent::InputTranscriptDelta(_) => {}
RealtimeEvent::OutputTranscriptDelta(_) => {}
RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame),
RealtimeEvent::ResponseCancelled(_) => {}
RealtimeEvent::ConversationItemAdded(_item) => {}
RealtimeEvent::ConversationItemDone { .. } => {}
RealtimeEvent::HandoffRequested(_) => {}
@ -296,8 +310,11 @@ impl ChatWidget {
#[cfg(not(target_os = "linux"))]
{
if self.realtime_conversation.audio_player.is_none() {
self.realtime_conversation.audio_player =
crate::voice::RealtimeAudioPlayer::start(&self.config).ok();
self.realtime_conversation.audio_player = crate::voice::RealtimeAudioPlayer::start(
&self.config,
Arc::clone(&self.realtime_conversation.playback_queued_samples),
)
.ok();
}
if let Some(player) = &self.realtime_conversation.audio_player
&& let Err(err) = player.enqueue_frame(frame)
@ -324,6 +341,7 @@ impl ChatWidget {
let capture = match crate::voice::VoiceCapture::start_realtime(
&self.config,
self.app_event_tx.clone(),
Arc::clone(&self.realtime_conversation.playback_queued_samples),
) {
Ok(capture) => capture,
Err(err) => {
@ -342,8 +360,11 @@ impl ChatWidget {
self.realtime_conversation.capture_stop_flag = Some(stop_flag.clone());
self.realtime_conversation.capture = Some(capture);
if self.realtime_conversation.audio_player.is_none() {
self.realtime_conversation.audio_player =
crate::voice::RealtimeAudioPlayer::start(&self.config).ok();
self.realtime_conversation.audio_player = crate::voice::RealtimeAudioPlayer::start(
&self.config,
Arc::clone(&self.realtime_conversation.playback_queued_samples),
)
.ok();
}
std::thread::spawn(move || {
@ -381,7 +402,10 @@ impl ChatWidget {
}
RealtimeAudioDeviceKind::Speaker => {
self.stop_realtime_speaker();
match crate::voice::RealtimeAudioPlayer::start(&self.config) {
match crate::voice::RealtimeAudioPlayer::start(
&self.config,
Arc::clone(&self.realtime_conversation.playback_queued_samples),
) {
Ok(player) => {
self.realtime_conversation.audio_player = Some(player);
}

View file

@ -140,6 +140,7 @@ mod voice {
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU16;
use std::sync::atomic::AtomicUsize;
pub struct RecordedAudio {
pub data: Vec<i16>,
@ -158,7 +159,11 @@ mod voice {
Err("voice input is unavailable in this build".to_string())
}
pub fn start_realtime(_config: &Config, _tx: AppEventSender) -> Result<Self, String> {
pub fn start_realtime(
_config: &Config,
_tx: AppEventSender,
_playback_queued_samples: Arc<AtomicUsize>,
) -> Result<Self, String> {
Err("voice input is unavailable in this build".to_string())
}
@ -198,7 +203,10 @@ mod voice {
}
impl RealtimeAudioPlayer {
pub(crate) fn start(_config: &Config) -> Result<Self, String> {
pub(crate) fn start(
_config: &Config,
_queued_samples: Arc<AtomicUsize>,
) -> Result<Self, String> {
Err("voice output is unavailable in this build".to_string())
}

View file

@ -1,5 +1,7 @@
use crate::app_event::AppEvent;
use crate::app_event_sender::AppEventSender;
use crate::audio_device::preferred_input_config;
use crate::audio_device::select_configured_input_device_and_config;
use base64::Engine;
use codex_client::build_reqwest_client_with_custom_ca;
use codex_core::auth::AuthCredentialsStoreMode;
@ -23,7 +25,10 @@ use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU16;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use tracing::error;
use tracing::info;
use tracing::trace;
@ -31,6 +36,13 @@ use tracing::trace;
const AUDIO_MODEL: &str = "gpt-4o-mini-transcribe";
const MODEL_AUDIO_SAMPLE_RATE: u32 = 24_000;
const MODEL_AUDIO_CHANNELS: u16 = 1;
// While playback is buffered, ignore low-level mic input that is likely just
// speaker echo. A user who starts talking over playback should cross this peak
// threshold and reopen the gate.
const REALTIME_INTERRUPT_INPUT_PEAK_THRESHOLD: u16 = 4_000;
// After we decide an interruption is intentional, keep forwarding the next few
// callbacks so trailing syllables are not chopped up between chunks.
const REALTIME_INTERRUPT_GRACE_PERIOD: Duration = Duration::from_millis(900);
struct TranscriptionAuthContext {
mode: AuthMode,
@ -79,8 +91,12 @@ impl VoiceCapture {
})
}
pub fn start_realtime(config: &Config, tx: AppEventSender) -> Result<Self, String> {
let (device, config) = select_realtime_input_device_and_config(config)?;
pub fn start_realtime(
config: &Config,
tx: AppEventSender,
playback_queued_samples: Arc<AtomicUsize>,
) -> Result<Self, String> {
let (device, config) = select_configured_input_device_and_config(config)?;
let sample_rate = config.sample_rate().0;
let channels = config.channels();
@ -94,6 +110,7 @@ impl VoiceCapture {
sample_rate,
channels,
tx,
playback_queued_samples,
last_peak.clone(),
)?;
stream
@ -273,16 +290,10 @@ fn select_default_input_device_and_config()
let device = host
.default_input_device()
.ok_or_else(|| "no input audio device available".to_string())?;
let config = crate::audio_device::preferred_input_config(&device)?;
let config = preferred_input_config(&device)?;
Ok((device, config))
}
fn select_realtime_input_device_and_config(
config: &Config,
) -> Result<(cpal::Device, cpal::SupportedStreamConfig), String> {
crate::audio_device::select_configured_input_device_and_config(config)
}
fn build_input_stream(
device: &cpal::Device,
config: &cpal::SupportedStreamConfig,
@ -343,17 +354,33 @@ fn build_realtime_input_stream(
sample_rate: u32,
channels: u16,
tx: AppEventSender,
playback_queued_samples: Arc<AtomicUsize>,
last_peak: Arc<AtomicU16>,
) -> Result<cpal::Stream, String> {
match config.sample_format() {
cpal::SampleFormat::F32 => device
.build_input_stream(
&config.clone().into(),
move |input: &[f32], _| {
let peak = peak_f32(input);
last_peak.store(peak, Ordering::Relaxed);
let samples = input.iter().copied().map(f32_to_i16).collect::<Vec<_>>();
send_realtime_audio_chunk(&tx, samples, sample_rate, channels);
{
let playback_queued_samples = Arc::clone(&playback_queued_samples);
let last_peak = Arc::clone(&last_peak);
let tx = tx;
let mut allow_input_until = None;
move |input: &[f32], _| {
let peak = peak_f32(input);
if !should_send_realtime_input(
peak,
&playback_queued_samples,
&mut allow_input_until,
Instant::now(),
) {
last_peak.store(0, Ordering::Relaxed);
return;
}
last_peak.store(peak, Ordering::Relaxed);
let samples = input.iter().copied().map(f32_to_i16).collect::<Vec<_>>();
send_realtime_audio_chunk(&tx, samples, sample_rate, channels);
}
},
move |err| error!("audio input error: {err}"),
None,
@ -362,10 +389,25 @@ fn build_realtime_input_stream(
cpal::SampleFormat::I16 => device
.build_input_stream(
&config.clone().into(),
move |input: &[i16], _| {
let peak = peak_i16(input);
last_peak.store(peak, Ordering::Relaxed);
send_realtime_audio_chunk(&tx, input.to_vec(), sample_rate, channels);
{
let playback_queued_samples = Arc::clone(&playback_queued_samples);
let last_peak = Arc::clone(&last_peak);
let tx = tx;
let mut allow_input_until = None;
move |input: &[i16], _| {
let peak = peak_i16(input);
if !should_send_realtime_input(
peak,
&playback_queued_samples,
&mut allow_input_until,
Instant::now(),
) {
last_peak.store(0, Ordering::Relaxed);
return;
}
last_peak.store(peak, Ordering::Relaxed);
send_realtime_audio_chunk(&tx, input.to_vec(), sample_rate, channels);
}
},
move |err| error!("audio input error: {err}"),
None,
@ -374,11 +416,26 @@ fn build_realtime_input_stream(
cpal::SampleFormat::U16 => device
.build_input_stream(
&config.clone().into(),
move |input: &[u16], _| {
let mut samples = Vec::with_capacity(input.len());
let peak = convert_u16_to_i16_and_peak(input, &mut samples);
last_peak.store(peak, Ordering::Relaxed);
send_realtime_audio_chunk(&tx, samples, sample_rate, channels);
{
let playback_queued_samples = Arc::clone(&playback_queued_samples);
let last_peak = Arc::clone(&last_peak);
let tx = tx;
let mut allow_input_until = None;
move |input: &[u16], _| {
let mut samples = Vec::with_capacity(input.len());
let peak = convert_u16_to_i16_and_peak(input, &mut samples);
if !should_send_realtime_input(
peak,
&playback_queued_samples,
&mut allow_input_until,
Instant::now(),
) {
last_peak.store(0, Ordering::Relaxed);
return;
}
last_peak.store(peak, Ordering::Relaxed);
send_realtime_audio_chunk(&tx, samples, sample_rate, channels);
}
},
move |err| error!("audio input error: {err}"),
None,
@ -487,24 +544,33 @@ fn convert_u16_to_i16_and_peak(input: &[u16], out: &mut Vec<i16>) -> u16 {
pub(crate) struct RealtimeAudioPlayer {
_stream: cpal::Stream,
queue: Arc<Mutex<VecDeque<i16>>>,
// Mirror the queue depth without locking so the input callback can cheaply
// tell whether playback is still draining.
queued_samples: Arc<AtomicUsize>,
output_sample_rate: u32,
output_channels: u16,
}
impl RealtimeAudioPlayer {
pub(crate) fn start(config: &Config) -> Result<Self, String> {
pub(crate) fn start(config: &Config, queued_samples: Arc<AtomicUsize>) -> Result<Self, String> {
let (device, config) =
crate::audio_device::select_configured_output_device_and_config(config)?;
let output_sample_rate = config.sample_rate().0;
let output_channels = config.channels();
let queue = Arc::new(Mutex::new(VecDeque::new()));
let stream = build_output_stream(&device, &config, Arc::clone(&queue))?;
let stream = build_output_stream(
&device,
&config,
Arc::clone(&queue),
Arc::clone(&queued_samples),
)?;
stream
.play()
.map_err(|e| format!("failed to start output stream: {e}"))?;
Ok(Self {
_stream: stream,
queue,
queued_samples,
output_sample_rate,
output_channels,
})
@ -539,12 +605,17 @@ impl RealtimeAudioPlayer {
.lock()
.map_err(|_| "failed to lock output audio queue".to_string())?;
// TODO(aibrahim): Cap or trim this queue if we observe producer bursts outrunning playback.
// Keep the atomic in sync with the buffered PCM so capture can treat any
// queued output as active playback even before the audio callback drains it.
self.queued_samples
.fetch_add(converted.len(), Ordering::Relaxed);
guard.extend(converted);
Ok(())
}
pub(crate) fn clear(&self) {
if let Ok(mut guard) = self.queue.lock() {
self.queued_samples.store(0, Ordering::Relaxed);
guard.clear();
}
}
@ -554,13 +625,14 @@ fn build_output_stream(
device: &cpal::Device,
config: &cpal::SupportedStreamConfig,
queue: Arc<Mutex<VecDeque<i16>>>,
queued_samples: Arc<AtomicUsize>,
) -> Result<cpal::Stream, String> {
let config_any: cpal::StreamConfig = config.clone().into();
match config.sample_format() {
cpal::SampleFormat::F32 => device
.build_output_stream(
&config_any,
move |output: &mut [f32], _| fill_output_f32(output, &queue),
move |output: &mut [f32], _| fill_output_f32(output, &queue, &queued_samples),
move |err| error!("audio output error: {err}"),
None,
)
@ -568,7 +640,7 @@ fn build_output_stream(
cpal::SampleFormat::I16 => device
.build_output_stream(
&config_any,
move |output: &mut [i16], _| fill_output_i16(output, &queue),
move |output: &mut [i16], _| fill_output_i16(output, &queue, &queued_samples),
move |err| error!("audio output error: {err}"),
None,
)
@ -576,7 +648,7 @@ fn build_output_stream(
cpal::SampleFormat::U16 => device
.build_output_stream(
&config_any,
move |output: &mut [u16], _| fill_output_u16(output, &queue),
move |output: &mut [u16], _| fill_output_u16(output, &queue, &queued_samples),
move |err| error!("audio output error: {err}"),
None,
)
@ -585,38 +657,112 @@ fn build_output_stream(
}
}
fn fill_output_i16(output: &mut [i16], queue: &Arc<Mutex<VecDeque<i16>>>) {
fn fill_output_i16(
output: &mut [i16],
queue: &Arc<Mutex<VecDeque<i16>>>,
queued_samples: &Arc<AtomicUsize>,
) {
if let Ok(mut guard) = queue.lock() {
let mut consumed = 0usize;
for sample in output {
*sample = guard.pop_front().unwrap_or(0);
*sample = if let Some(next) = guard.pop_front() {
consumed += 1;
next
} else {
0
};
}
if consumed > 0 {
let _ = queued_samples.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |queued| {
Some(queued.saturating_sub(consumed))
});
}
return;
}
output.fill(0);
}
fn fill_output_f32(output: &mut [f32], queue: &Arc<Mutex<VecDeque<i16>>>) {
fn fill_output_f32(
output: &mut [f32],
queue: &Arc<Mutex<VecDeque<i16>>>,
queued_samples: &Arc<AtomicUsize>,
) {
if let Ok(mut guard) = queue.lock() {
let mut consumed = 0usize;
for sample in output {
let v = guard.pop_front().unwrap_or(0);
let v = if let Some(next) = guard.pop_front() {
consumed += 1;
next
} else {
0
};
*sample = (v as f32) / (i16::MAX as f32);
}
if consumed > 0 {
let _ = queued_samples.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |queued| {
Some(queued.saturating_sub(consumed))
});
}
return;
}
output.fill(0.0);
}
fn fill_output_u16(output: &mut [u16], queue: &Arc<Mutex<VecDeque<i16>>>) {
fn fill_output_u16(
output: &mut [u16],
queue: &Arc<Mutex<VecDeque<i16>>>,
queued_samples: &Arc<AtomicUsize>,
) {
if let Ok(mut guard) = queue.lock() {
let mut consumed = 0usize;
for sample in output {
let v = guard.pop_front().unwrap_or(0);
let v = if let Some(next) = guard.pop_front() {
consumed += 1;
next
} else {
0
};
*sample = (v as i32 + 32768).clamp(0, u16::MAX as i32) as u16;
}
if consumed > 0 {
let _ = queued_samples.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |queued| {
Some(queued.saturating_sub(consumed))
});
}
return;
}
output.fill(32768);
}
/// Block quiet mic chunks while assistant playback is still buffered, but once a
/// real interruption is detected keep forwarding input briefly so the full
/// utterance reaches the server.
fn should_send_realtime_input(
peak: u16,
playback_queued_samples: &Arc<AtomicUsize>,
allow_input_until: &mut Option<Instant>,
now: Instant,
) -> bool {
if playback_queued_samples.load(Ordering::Relaxed) == 0 {
*allow_input_until = None;
return true;
}
if let Some(deadline) = *allow_input_until {
if now < deadline {
return true;
}
*allow_input_until = None;
}
if peak >= REALTIME_INTERRUPT_INPUT_PEAK_THRESHOLD {
*allow_input_until = Some(now + REALTIME_INTERRUPT_GRACE_PERIOD);
return true;
}
false
}
fn convert_pcm16(
input: &[i16],
input_sample_rate: u32,