diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index e3d93cf96..c6689710b 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -367,6 +367,9 @@ "prevent_idle_sleep": { "type": "boolean" }, + "realtime_conversation": { + "type": "boolean" + }, "remote_models": { "type": "boolean" }, @@ -1657,6 +1660,9 @@ "prevent_idle_sleep": { "type": "boolean" }, + "realtime_conversation": { + "type": "boolean" + }, "remote_models": { "type": "boolean" }, diff --git a/codex-rs/core/src/features.rs b/codex-rs/core/src/features.rs index 990aa95fa..d3bace851 100644 --- a/codex-rs/core/src/features.rs +++ b/codex-rs/core/src/features.rs @@ -144,6 +144,8 @@ pub enum Feature { Personality, /// Enable voice transcription in the TUI composer. VoiceTranscription, + /// Enable experimental realtime voice conversation mode in the TUI. + RealtimeConversation, /// Prevent idle system sleep while a turn is actively running. PreventIdleSleep, /// Use the Responses API WebSocket transport for OpenAI by default. @@ -651,6 +653,12 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::UnderDevelopment, default_enabled: false, }, + FeatureSpec { + id: Feature::RealtimeConversation, + key: "realtime_conversation", + stage: Stage::UnderDevelopment, + default_enabled: false, + }, FeatureSpec { id: Feature::PreventIdleSleep, key: "prevent_idle_sleep", diff --git a/codex-rs/tui/src/bottom_pane/chat_composer.rs b/codex-rs/tui/src/bottom_pane/chat_composer.rs index 3c7467d4a..8d9e4b250 100644 --- a/codex-rs/tui/src/bottom_pane/chat_composer.rs +++ b/codex-rs/tui/src/bottom_pane/chat_composer.rs @@ -389,6 +389,7 @@ pub(crate) struct ChatComposer { collaboration_mode_indicator: Option, connectors_enabled: bool, personality_command_enabled: bool, + realtime_conversation_enabled: bool, windows_degraded_sandbox_active: bool, status_line_value: Option>, status_line_enabled: bool, @@ -494,6 +495,7 @@ impl ChatComposer { collaboration_mode_indicator: None, connectors_enabled: false, personality_command_enabled: false, + realtime_conversation_enabled: false, windows_degraded_sandbox_active: false, status_line_value: None, status_line_enabled: false, @@ -578,6 +580,10 @@ impl ChatComposer { self.personality_command_enabled = enabled; } + pub fn set_realtime_conversation_enabled(&mut self, enabled: bool) { + self.realtime_conversation_enabled = enabled; + } + pub fn set_voice_transcription_enabled(&mut self, enabled: bool) { self.voice_state.transcription_enabled = enabled; if !enabled { @@ -2260,6 +2266,7 @@ impl ChatComposer { self.collaboration_modes_enabled, self.connectors_enabled, self.personality_command_enabled, + self.realtime_conversation_enabled, self.windows_degraded_sandbox_active, ) .is_some(); @@ -2459,6 +2466,7 @@ impl ChatComposer { self.collaboration_modes_enabled, self.connectors_enabled, self.personality_command_enabled, + self.realtime_conversation_enabled, self.windows_degraded_sandbox_active, ) { @@ -2493,6 +2501,7 @@ impl ChatComposer { self.collaboration_modes_enabled, self.connectors_enabled, self.personality_command_enabled, + self.realtime_conversation_enabled, self.windows_degraded_sandbox_active, )?; @@ -3325,6 +3334,7 @@ impl ChatComposer { self.collaboration_modes_enabled, self.connectors_enabled, self.personality_command_enabled, + self.realtime_conversation_enabled, self.windows_degraded_sandbox_active, ) .is_some(); @@ -3386,6 +3396,7 @@ impl ChatComposer { self.collaboration_modes_enabled, self.connectors_enabled, self.personality_command_enabled, + self.realtime_conversation_enabled, self.windows_degraded_sandbox_active, ) { return true; @@ -3439,12 +3450,14 @@ impl ChatComposer { let collaboration_modes_enabled = self.collaboration_modes_enabled; let connectors_enabled = self.connectors_enabled; let personality_command_enabled = self.personality_command_enabled; + let realtime_conversation_enabled = self.realtime_conversation_enabled; let mut command_popup = CommandPopup::new( self.custom_prompts.clone(), CommandPopupFlags { collaboration_modes_enabled, connectors_enabled, personality_command_enabled, + realtime_conversation_enabled, windows_degraded_sandbox_active: self.windows_degraded_sandbox_active, }, ); @@ -3955,6 +3968,13 @@ impl ChatComposer { self.textarea.update_named_element_by_id(id, text) } + #[cfg(not(target_os = "linux"))] + pub fn insert_transcription_placeholder(&mut self, text: &str) -> String { + let id = self.next_id(); + self.textarea.insert_named_element(text, id.clone()); + id + } + pub fn remove_transcription_placeholder(&mut self, id: &str) { self.stop_transcription_spinner(id); let _ = self.textarea.replace_element_by_id(id, ""); diff --git a/codex-rs/tui/src/bottom_pane/command_popup.rs b/codex-rs/tui/src/bottom_pane/command_popup.rs index 48b6ffedc..83b523cc2 100644 --- a/codex-rs/tui/src/bottom_pane/command_popup.rs +++ b/codex-rs/tui/src/bottom_pane/command_popup.rs @@ -39,6 +39,7 @@ pub(crate) struct CommandPopupFlags { pub(crate) collaboration_modes_enabled: bool, pub(crate) connectors_enabled: bool, pub(crate) personality_command_enabled: bool, + pub(crate) realtime_conversation_enabled: bool, pub(crate) windows_degraded_sandbox_active: bool, } @@ -49,6 +50,7 @@ impl CommandPopup { flags.collaboration_modes_enabled, flags.connectors_enabled, flags.personality_command_enabled, + flags.realtime_conversation_enabled, flags.windows_degraded_sandbox_active, ) .into_iter() @@ -495,6 +497,7 @@ mod tests { collaboration_modes_enabled: true, connectors_enabled: false, personality_command_enabled: true, + realtime_conversation_enabled: false, windows_degraded_sandbox_active: false, }, ); @@ -514,6 +517,7 @@ mod tests { collaboration_modes_enabled: true, connectors_enabled: false, personality_command_enabled: true, + realtime_conversation_enabled: false, windows_degraded_sandbox_active: false, }, ); @@ -533,6 +537,7 @@ mod tests { collaboration_modes_enabled: true, connectors_enabled: false, personality_command_enabled: false, + realtime_conversation_enabled: false, windows_degraded_sandbox_active: false, }, ); @@ -560,6 +565,7 @@ mod tests { collaboration_modes_enabled: true, connectors_enabled: false, personality_command_enabled: true, + realtime_conversation_enabled: false, windows_degraded_sandbox_active: false, }, ); diff --git a/codex-rs/tui/src/bottom_pane/mod.rs b/codex-rs/tui/src/bottom_pane/mod.rs index 28cd1ddc7..680129236 100644 --- a/codex-rs/tui/src/bottom_pane/mod.rs +++ b/codex-rs/tui/src/bottom_pane/mod.rs @@ -292,6 +292,11 @@ impl BottomPane { self.request_redraw(); } + pub fn set_realtime_conversation_enabled(&mut self, enabled: bool) { + self.composer.set_realtime_conversation_enabled(enabled); + self.request_redraw(); + } + pub fn set_voice_transcription_enabled(&mut self, enabled: bool) { self.composer.set_voice_transcription_enabled(enabled); self.request_redraw(); @@ -1007,6 +1012,13 @@ impl BottomPane { #[cfg(not(target_os = "linux"))] impl BottomPane { + pub(crate) fn insert_transcription_placeholder(&mut self, text: &str) -> String { + let id = self.composer.insert_transcription_placeholder(text); + self.composer.sync_popups(); + self.request_redraw(); + id + } + pub(crate) fn replace_transcription(&mut self, id: &str, text: &str) { self.composer.replace_transcription(id, text); self.composer.sync_popups(); diff --git a/codex-rs/tui/src/bottom_pane/slash_commands.rs b/codex-rs/tui/src/bottom_pane/slash_commands.rs index 736eb3102..86c131e6d 100644 --- a/codex-rs/tui/src/bottom_pane/slash_commands.rs +++ b/codex-rs/tui/src/bottom_pane/slash_commands.rs @@ -13,6 +13,7 @@ pub(crate) fn builtins_for_input( collaboration_modes_enabled: bool, connectors_enabled: bool, personality_command_enabled: bool, + realtime_conversation_enabled: bool, allow_elevate_sandbox: bool, ) -> Vec<(&'static str, SlashCommand)> { built_in_slash_commands() @@ -24,6 +25,7 @@ pub(crate) fn builtins_for_input( }) .filter(|(_, cmd)| connectors_enabled || *cmd != SlashCommand::Apps) .filter(|(_, cmd)| personality_command_enabled || *cmd != SlashCommand::Personality) + .filter(|(_, cmd)| realtime_conversation_enabled || *cmd != SlashCommand::Realtime) .collect() } @@ -33,12 +35,14 @@ pub(crate) fn find_builtin_command( collaboration_modes_enabled: bool, connectors_enabled: bool, personality_command_enabled: bool, + realtime_conversation_enabled: bool, allow_elevate_sandbox: bool, ) -> Option { builtins_for_input( collaboration_modes_enabled, connectors_enabled, personality_command_enabled, + realtime_conversation_enabled, allow_elevate_sandbox, ) .into_iter() @@ -52,12 +56,14 @@ pub(crate) fn has_builtin_prefix( collaboration_modes_enabled: bool, connectors_enabled: bool, personality_command_enabled: bool, + realtime_conversation_enabled: bool, allow_elevate_sandbox: bool, ) -> bool { builtins_for_input( collaboration_modes_enabled, connectors_enabled, personality_command_enabled, + realtime_conversation_enabled, allow_elevate_sandbox, ) .into_iter() @@ -71,15 +77,23 @@ mod tests { #[test] fn debug_command_still_resolves_for_dispatch() { - let cmd = find_builtin_command("debug-config", true, true, true, false); + let cmd = find_builtin_command("debug-config", true, true, true, false, false); assert_eq!(cmd, Some(SlashCommand::DebugConfig)); } #[test] fn clear_command_resolves_for_dispatch() { assert_eq!( - find_builtin_command("clear", true, true, true, false), + find_builtin_command("clear", true, true, true, false, false), Some(SlashCommand::Clear) ); } + + #[test] + fn realtime_command_is_hidden_when_realtime_is_disabled() { + assert_eq!( + find_builtin_command("realtime", true, true, true, false, false), + None + ); + } } diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index a67cc2d4b..8e9f12538 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -259,6 +259,9 @@ mod skills; use self::skills::collect_tool_mentions; use self::skills::find_app_mentions; use self::skills::find_skill_mentions_with_tool_mentions; +mod realtime; +use self::realtime::RealtimeConversationUiState; +use self::realtime::RenderedUserMessageEvent; use crate::mention_codec::LinkedMention; use crate::mention_codec::encode_history_mentions; use crate::streaming::chunking::AdaptiveChunkingPolicy; @@ -665,6 +668,8 @@ pub(crate) struct ChatWidget { // True once we've attempted a branch lookup for the current CWD. status_line_branch_lookup_complete: bool, external_editor_state: ExternalEditorState, + realtime_conversation: RealtimeConversationUiState, + last_rendered_user_message_event: Option, } /// Snapshot of active-cell state that affects transcript overlay rendering. @@ -842,6 +847,11 @@ enum ReplayKind { } impl ChatWidget { + fn realtime_conversation_enabled(&self) -> bool { + self.config.features.enabled(Feature::RealtimeConversation) + && cfg!(not(target_os = "linux")) + } + /// Synchronize the bottom-pane "task running" indicator with the current lifecycles. /// /// The bottom pane only has one running flag, but this module treats it as a derived state of @@ -2847,6 +2857,8 @@ impl ChatWidget { status_line_branch_pending: false, status_line_branch_lookup_complete: false, external_editor_state: ExternalEditorState::Closed, + realtime_conversation: RealtimeConversationUiState::default(), + last_rendered_user_message_event: None, }; widget.prefetch_rate_limits(); @@ -2856,6 +2868,9 @@ impl ChatWidget { widget.bottom_pane.set_voice_transcription_enabled( widget.config.features.enabled(Feature::VoiceTranscription), ); + widget + .bottom_pane + .set_realtime_conversation_enabled(widget.realtime_conversation_enabled()); widget .bottom_pane .set_status_line_enabled(!widget.configured_status_line_items().is_empty()); @@ -3018,6 +3033,8 @@ impl ChatWidget { status_line_branch_pending: false, status_line_branch_lookup_complete: false, external_editor_state: ExternalEditorState::Closed, + realtime_conversation: RealtimeConversationUiState::default(), + last_rendered_user_message_event: None, }; widget.prefetch_rate_limits(); @@ -3027,6 +3044,9 @@ impl ChatWidget { widget.bottom_pane.set_voice_transcription_enabled( widget.config.features.enabled(Feature::VoiceTranscription), ); + widget + .bottom_pane + .set_realtime_conversation_enabled(widget.realtime_conversation_enabled()); widget .bottom_pane .set_status_line_enabled(!widget.configured_status_line_items().is_empty()); @@ -3178,6 +3198,8 @@ impl ChatWidget { status_line_branch_pending: false, status_line_branch_lookup_complete: false, external_editor_state: ExternalEditorState::Closed, + realtime_conversation: RealtimeConversationUiState::default(), + last_rendered_user_message_event: None, }; widget.prefetch_rate_limits(); @@ -3187,6 +3209,9 @@ impl ChatWidget { widget.bottom_pane.set_voice_transcription_enabled( widget.config.features.enabled(Feature::VoiceTranscription), ); + widget + .bottom_pane + .set_realtime_conversation_enabled(widget.realtime_conversation_enabled()); widget .bottom_pane .set_status_line_enabled(!widget.configured_status_line_items().is_empty()); @@ -3308,6 +3333,11 @@ impl ChatWidget { .bottom_pane .take_recent_submission_mention_bindings(), }; + let Some(user_message) = + self.maybe_defer_user_message_for_realtime(user_message) + else { + return; + }; // Steer submissions during active final-answer streaming can race with turn // completion and strand the UI in a running state. Queue those inputs instead // of injecting immediately; `on_task_complete()` drains this FIFO via @@ -3343,6 +3373,11 @@ impl ChatWidget { .bottom_pane .take_recent_submission_mention_bindings(), }; + let Some(user_message) = + self.maybe_defer_user_message_for_realtime(user_message) + else { + return; + }; self.queue_user_message(user_message); } InputResult::Command(cmd) => { @@ -3479,6 +3514,16 @@ impl ChatWidget { SlashCommand::Model => { self.open_model_popup(); } + SlashCommand::Realtime => { + if !self.realtime_conversation_enabled() { + return; + } + if self.realtime_conversation.is_live() { + self.request_realtime_conversation_close(None); + } else { + self.start_realtime_conversation(); + } + } SlashCommand::Personality => { self.open_personality_popup(); } @@ -4086,7 +4131,17 @@ impl ChatWidget { // Show replayable user content in conversation history. if !text.is_empty() { - let local_image_paths = local_images.into_iter().map(|img| img.path).collect(); + let local_image_paths = local_images + .into_iter() + .map(|img| img.path) + .collect::>(); + self.last_rendered_user_message_event = + Some(Self::rendered_user_message_event_from_parts( + text.clone(), + text_elements.clone(), + local_image_paths.clone(), + remote_image_urls.clone(), + )); self.add_to_history(history_cell::new_user_prompt( text, text_elements, @@ -4094,6 +4149,13 @@ impl ChatWidget { remote_image_urls, )); } else if !remote_image_urls.is_empty() { + self.last_rendered_user_message_event = + Some(Self::rendered_user_message_event_from_parts( + String::new(), + Vec::new(), + Vec::new(), + remote_image_urls.clone(), + )); self.add_to_history(history_cell::new_user_prompt( String::new(), Vec::new(), @@ -4316,7 +4378,7 @@ impl ChatWidget { } } EventMsg::UserMessage(ev) => { - if from_replay { + if from_replay || self.should_render_realtime_user_message_event(&ev) { self.on_user_message_event(ev); } } @@ -4351,11 +4413,23 @@ impl ChatWidget { | EventMsg::AgentMessageContentDelta(_) | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) - | EventMsg::RealtimeConversationStarted(_) - | EventMsg::RealtimeConversationRealtime(_) - | EventMsg::RealtimeConversationClosed(_) | EventMsg::DynamicToolCallRequest(_) | EventMsg::SkillRequestApproval(_) => {} + EventMsg::RealtimeConversationStarted(ev) => { + if !from_replay { + self.on_realtime_conversation_started(ev); + } + } + EventMsg::RealtimeConversationRealtime(ev) => { + if !from_replay { + self.on_realtime_conversation_realtime(ev); + } + } + EventMsg::RealtimeConversationClosed(ev) => { + if !from_replay { + self.on_realtime_conversation_closed(ev); + } + } EventMsg::ItemCompleted(event) => { let item = event.item; if let codex_protocol::items::TurnItem::Plan(plan_item) = &item { @@ -4426,6 +4500,8 @@ impl ChatWidget { } fn on_user_message_event(&mut self, event: UserMessageEvent) { + self.last_rendered_user_message_event = + Some(Self::rendered_user_message_event_from_event(&event)); let remote_image_urls = event.images.unwrap_or_default(); if !event.message.trim().is_empty() || !event.text_elements.is_empty() @@ -6401,6 +6477,17 @@ impl ChatWidget { if feature == Feature::VoiceTranscription { self.bottom_pane.set_voice_transcription_enabled(enabled); } + if feature == Feature::RealtimeConversation { + let realtime_conversation_enabled = self.realtime_conversation_enabled(); + self.bottom_pane + .set_realtime_conversation_enabled(realtime_conversation_enabled); + if !realtime_conversation_enabled && self.realtime_conversation.is_live() { + self.request_realtime_conversation_close(Some( + "Realtime voice mode was closed because the feature was disabled.".to_string(), + )); + self.reset_realtime_conversation_state(); + } + } if feature == Feature::Personality { self.sync_personality_command_enabled(); } @@ -7588,6 +7675,7 @@ fn has_websocket_timing_metrics(summary: RuntimeMetricsSummary) -> bool { impl Drop for ChatWidget { fn drop(&mut self) { + self.reset_realtime_conversation_state(); self.stop_rate_limit_poller(); } } diff --git a/codex-rs/tui/src/chatwidget/realtime.rs b/codex-rs/tui/src/chatwidget/realtime.rs new file mode 100644 index 000000000..2cf97188b --- /dev/null +++ b/codex-rs/tui/src/chatwidget/realtime.rs @@ -0,0 +1,298 @@ +use super::*; +use codex_protocol::protocol::ConversationStartParams; +use codex_protocol::protocol::RealtimeAudioFrame; +use codex_protocol::protocol::RealtimeConversationClosedEvent; +use codex_protocol::protocol::RealtimeConversationRealtimeEvent; +use codex_protocol::protocol::RealtimeConversationStartedEvent; +use codex_protocol::protocol::RealtimeEvent; + +const REALTIME_CONVERSATION_PROMPT: &str = "You are in a realtime voice conversation in the Codex TUI. Respond conversationally and concisely."; + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub(super) enum RealtimeConversationPhase { + #[default] + Inactive, + Starting, + Active, + Stopping, +} + +#[derive(Default)] +pub(super) struct RealtimeConversationUiState { + phase: RealtimeConversationPhase, + requested_close: bool, + session_id: Option, + warned_audio_only_submission: bool, + meter_placeholder_id: Option, + #[cfg(not(target_os = "linux"))] + capture_stop_flag: Option>, + #[cfg(not(target_os = "linux"))] + capture: Option, + #[cfg(not(target_os = "linux"))] + audio_player: Option, +} + +impl RealtimeConversationUiState { + pub(super) fn is_live(&self) -> bool { + matches!( + self.phase, + RealtimeConversationPhase::Starting + | RealtimeConversationPhase::Active + | RealtimeConversationPhase::Stopping + ) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub(super) struct RenderedUserMessageEvent { + message: String, + remote_image_urls: Vec, + local_images: Vec, + text_elements: Vec, +} + +impl ChatWidget { + pub(super) fn rendered_user_message_event_from_parts( + message: String, + text_elements: Vec, + local_images: Vec, + remote_image_urls: Vec, + ) -> RenderedUserMessageEvent { + RenderedUserMessageEvent { + message, + remote_image_urls, + local_images, + text_elements, + } + } + + pub(super) fn rendered_user_message_event_from_event( + event: &UserMessageEvent, + ) -> RenderedUserMessageEvent { + Self::rendered_user_message_event_from_parts( + event.message.clone(), + event.text_elements.clone(), + event.local_images.clone(), + event.images.clone().unwrap_or_default(), + ) + } + + pub(super) fn should_render_realtime_user_message_event( + &self, + event: &UserMessageEvent, + ) -> bool { + if !self.realtime_conversation.is_live() { + return false; + } + let key = Self::rendered_user_message_event_from_event(event); + self.last_rendered_user_message_event.as_ref() != Some(&key) + } + + pub(super) fn maybe_defer_user_message_for_realtime( + &mut self, + user_message: UserMessage, + ) -> Option { + if !self.realtime_conversation.is_live() { + return Some(user_message); + } + + self.restore_user_message_to_composer(user_message); + if !self.realtime_conversation.warned_audio_only_submission { + self.realtime_conversation.warned_audio_only_submission = true; + self.add_info_message( + "Realtime voice mode is audio-only. Use /realtime to stop.".to_string(), + None, + ); + } else { + self.request_redraw(); + } + + None + } + + fn realtime_footer_hint_items() -> Vec<(String, String)> { + vec![("/realtime".to_string(), "stop live voice".to_string())] + } + + pub(super) fn start_realtime_conversation(&mut self) { + self.realtime_conversation.phase = RealtimeConversationPhase::Starting; + self.realtime_conversation.requested_close = false; + self.realtime_conversation.session_id = None; + self.realtime_conversation.warned_audio_only_submission = false; + self.set_footer_hint_override(Some(Self::realtime_footer_hint_items())); + self.submit_op(Op::RealtimeConversationStart(ConversationStartParams { + prompt: REALTIME_CONVERSATION_PROMPT.to_string(), + session_id: None, + })); + self.request_redraw(); + } + + pub(super) fn request_realtime_conversation_close(&mut self, info_message: Option) { + if !self.realtime_conversation.is_live() { + if let Some(message) = info_message { + self.add_info_message(message, None); + } + return; + } + + self.realtime_conversation.requested_close = true; + self.realtime_conversation.phase = RealtimeConversationPhase::Stopping; + self.submit_op(Op::RealtimeConversationClose); + self.stop_realtime_local_audio(); + self.set_footer_hint_override(None); + + if let Some(message) = info_message { + self.add_info_message(message, None); + } else { + self.request_redraw(); + } + } + + pub(super) fn reset_realtime_conversation_state(&mut self) { + self.stop_realtime_local_audio(); + self.set_footer_hint_override(None); + self.realtime_conversation.phase = RealtimeConversationPhase::Inactive; + self.realtime_conversation.requested_close = false; + self.realtime_conversation.session_id = None; + self.realtime_conversation.warned_audio_only_submission = false; + } + + pub(super) fn on_realtime_conversation_started( + &mut self, + ev: RealtimeConversationStartedEvent, + ) { + if !self.realtime_conversation_enabled() { + self.submit_op(Op::RealtimeConversationClose); + self.reset_realtime_conversation_state(); + return; + } + self.realtime_conversation.phase = RealtimeConversationPhase::Active; + self.realtime_conversation.session_id = ev.session_id; + self.realtime_conversation.warned_audio_only_submission = false; + self.set_footer_hint_override(Some(Self::realtime_footer_hint_items())); + self.start_realtime_local_audio(); + self.request_redraw(); + } + + pub(super) fn on_realtime_conversation_realtime( + &mut self, + ev: RealtimeConversationRealtimeEvent, + ) { + match ev.payload { + RealtimeEvent::SessionCreated { session_id } => { + self.realtime_conversation.session_id = Some(session_id); + } + RealtimeEvent::SessionUpdated { .. } => {} + RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame), + RealtimeEvent::ConversationItemAdded(_item) => {} + RealtimeEvent::Error(message) => { + self.add_error_message(format!("Realtime voice error: {message}")); + self.reset_realtime_conversation_state(); + } + } + } + + pub(super) fn on_realtime_conversation_closed(&mut self, ev: RealtimeConversationClosedEvent) { + let requested = self.realtime_conversation.requested_close; + let reason = ev.reason; + self.reset_realtime_conversation_state(); + if !requested && let Some(reason) = reason { + self.add_info_message(format!("Realtime voice mode closed: {reason}"), None); + } + self.request_redraw(); + } + + fn enqueue_realtime_audio_out(&mut self, frame: &RealtimeAudioFrame) { + #[cfg(not(target_os = "linux"))] + { + if self.realtime_conversation.audio_player.is_none() { + self.realtime_conversation.audio_player = + crate::voice::RealtimeAudioPlayer::start().ok(); + } + if let Some(player) = &self.realtime_conversation.audio_player + && let Err(err) = player.enqueue_frame(frame) + { + warn!("failed to play realtime audio: {err}"); + } + } + #[cfg(target_os = "linux")] + { + let _ = frame; + } + } + + #[cfg(not(target_os = "linux"))] + fn start_realtime_local_audio(&mut self) { + if self.realtime_conversation.capture_stop_flag.is_some() { + return; + } + + let placeholder_id = self.bottom_pane.insert_transcription_placeholder("тадтадтадтад"); + self.realtime_conversation.meter_placeholder_id = Some(placeholder_id.clone()); + self.request_redraw(); + + let capture = match crate::voice::VoiceCapture::start_realtime(self.app_event_tx.clone()) { + Ok(capture) => capture, + Err(err) => { + self.remove_transcription_placeholder(&placeholder_id); + self.realtime_conversation.meter_placeholder_id = None; + self.add_error_message(format!("Failed to start microphone capture: {err}")); + return; + } + }; + + let stop_flag = capture.stopped_flag(); + let peak = capture.last_peak_arc(); + let meter_placeholder_id = placeholder_id; + let app_event_tx = self.app_event_tx.clone(); + + self.realtime_conversation.capture_stop_flag = Some(stop_flag.clone()); + self.realtime_conversation.capture = Some(capture); + if self.realtime_conversation.audio_player.is_none() { + self.realtime_conversation.audio_player = + crate::voice::RealtimeAudioPlayer::start().ok(); + } + + std::thread::spawn(move || { + let mut meter = crate::voice::RecordingMeterState::new(); + + loop { + if stop_flag.load(Ordering::Relaxed) { + break; + } + + let meter_text = meter.next_text(peak.load(Ordering::Relaxed)); + app_event_tx.send(AppEvent::UpdateRecordingMeter { + id: meter_placeholder_id.clone(), + text: meter_text, + }); + + std::thread::sleep(Duration::from_millis(60)); + } + }); + } + + #[cfg(target_os = "linux")] + fn start_realtime_local_audio(&mut self) {} + + #[cfg(not(target_os = "linux"))] + fn stop_realtime_local_audio(&mut self) { + if let Some(flag) = self.realtime_conversation.capture_stop_flag.take() { + flag.store(true, Ordering::Relaxed); + } + if let Some(capture) = self.realtime_conversation.capture.take() { + let _ = capture.stop(); + } + if let Some(id) = self.realtime_conversation.meter_placeholder_id.take() { + self.remove_transcription_placeholder(&id); + } + if let Some(player) = self.realtime_conversation.audio_player.take() { + player.clear(); + } + } + + #[cfg(target_os = "linux")] + fn stop_realtime_local_audio(&mut self) { + self.realtime_conversation.meter_placeholder_id = None; + } +} diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 8f6bf7176..6b7635508 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -1720,6 +1720,8 @@ async fn make_chatwidget_manual( status_line_branch_pending: false, status_line_branch_lookup_complete: false, external_editor_state: ExternalEditorState::Closed, + realtime_conversation: RealtimeConversationUiState::default(), + last_rendered_user_message_event: None, }; widget.set_model(&resolved_model); (widget, rx, op_rx) diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 76a7a853c..1d8349cf7 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -120,6 +120,7 @@ mod voice; mod voice { use crate::app_event::AppEvent; use crate::app_event_sender::AppEventSender; + use codex_protocol::protocol::RealtimeAudioFrame; use std::sync::Arc; use std::sync::Mutex; use std::sync::atomic::AtomicBool; @@ -135,11 +136,17 @@ mod voice { pub(crate) struct RecordingMeterState; + pub(crate) struct RealtimeAudioPlayer; + impl VoiceCapture { pub fn start() -> Result { Err("voice input is unavailable in this build".to_string()) } + pub fn start_realtime(_tx: AppEventSender) -> Result { + Err("voice input is unavailable in this build".to_string()) + } + pub fn stop(self) -> Result { Err("voice input is unavailable in this build".to_string()) } @@ -175,6 +182,18 @@ mod voice { } } + impl RealtimeAudioPlayer { + pub(crate) fn start() -> Result { + Err("voice output is unavailable in this build".to_string()) + } + + pub(crate) fn enqueue_frame(&self, _frame: &RealtimeAudioFrame) -> Result<(), String> { + Err("voice output is unavailable in this build".to_string()) + } + + pub(crate) fn clear(&self) {} + } + pub fn transcribe_async( id: String, _audio: RecordedAudio, diff --git a/codex-rs/tui/src/slash_command.rs b/codex-rs/tui/src/slash_command.rs index e4ee2dc76..468303ceb 100644 --- a/codex-rs/tui/src/slash_command.rs +++ b/codex-rs/tui/src/slash_command.rs @@ -49,6 +49,7 @@ pub enum SlashCommand { Clean, Clear, Personality, + Realtime, TestApproval, // Debugging commands. #[strum(serialize = "debug-m-drop")] @@ -85,6 +86,7 @@ impl SlashCommand { SlashCommand::MemoryUpdate => "DO NOT USE", SlashCommand::Model => "choose what model and reasoning effort to use", SlashCommand::Personality => "choose a communication style for Codex", + SlashCommand::Realtime => "toggle realtime voice mode (experimental)", SlashCommand::Plan => "switch to Plan mode", SlashCommand::Collab => "change collaboration mode (experimental)", SlashCommand::Agent => "switch the active agent thread", @@ -157,6 +159,7 @@ impl SlashCommand { | SlashCommand::Exit => true, SlashCommand::Rollout => true, SlashCommand::TestApproval => true, + SlashCommand::Realtime => true, SlashCommand::Collab => true, SlashCommand::Agent => true, SlashCommand::Statusline => false, diff --git a/codex-rs/tui/src/voice.rs b/codex-rs/tui/src/voice.rs index 6852fa6ff..443ccc88a 100644 --- a/codex-rs/tui/src/voice.rs +++ b/codex-rs/tui/src/voice.rs @@ -1,11 +1,15 @@ use crate::app_event::AppEvent; use crate::app_event_sender::AppEventSender; +use base64::Engine; use codex_core::auth::AuthCredentialsStoreMode; use codex_core::config::Config; use codex_core::config::find_codex_home; use codex_core::default_client::get_codex_user_agent; use codex_login::AuthMode; use codex_login::CodexAuth; +use codex_protocol::protocol::ConversationAudioParams; +use codex_protocol::protocol::Op; +use codex_protocol::protocol::RealtimeAudioFrame; use cpal::traits::DeviceTrait; use cpal::traits::HostTrait; use cpal::traits::StreamTrait; @@ -70,6 +74,37 @@ impl VoiceCapture { }) } + pub fn start_realtime(tx: AppEventSender) -> Result { + let (device, config) = select_input_device_and_config()?; + + let sample_rate = config.sample_rate().0; + let channels = config.channels(); + let data: Arc>> = Arc::new(Mutex::new(Vec::new())); + let stopped = Arc::new(AtomicBool::new(false)); + let last_peak = Arc::new(AtomicU16::new(0)); + + let stream = build_realtime_input_stream( + &device, + &config, + sample_rate, + channels, + tx, + last_peak.clone(), + )?; + stream + .play() + .map_err(|e| format!("failed to start input stream: {e}"))?; + + Ok(Self { + stream: Some(stream), + sample_rate, + channels, + data, + stopped, + last_peak, + }) + } + pub fn stop(mut self) -> Result { // Mark stopped so any metering task can exit cleanly. self.stopped.store(true, Ordering::SeqCst); @@ -292,6 +327,87 @@ fn build_input_stream( } } +fn build_realtime_input_stream( + device: &cpal::Device, + config: &cpal::SupportedStreamConfig, + sample_rate: u32, + channels: u16, + tx: AppEventSender, + last_peak: Arc, +) -> Result { + match config.sample_format() { + cpal::SampleFormat::F32 => device + .build_input_stream( + &config.clone().into(), + move |input: &[f32], _| { + let peak = peak_f32(input); + last_peak.store(peak, Ordering::Relaxed); + let samples = input.iter().copied().map(f32_to_i16).collect::>(); + send_realtime_audio_chunk(&tx, samples, sample_rate, channels); + }, + move |err| error!("audio input error: {err}"), + None, + ) + .map_err(|e| format!("failed to build input stream: {e}")), + cpal::SampleFormat::I16 => device + .build_input_stream( + &config.clone().into(), + move |input: &[i16], _| { + let peak = peak_i16(input); + last_peak.store(peak, Ordering::Relaxed); + send_realtime_audio_chunk(&tx, input.to_vec(), sample_rate, channels); + }, + move |err| error!("audio input error: {err}"), + None, + ) + .map_err(|e| format!("failed to build input stream: {e}")), + cpal::SampleFormat::U16 => device + .build_input_stream( + &config.clone().into(), + move |input: &[u16], _| { + let mut samples = Vec::with_capacity(input.len()); + let peak = convert_u16_to_i16_and_peak(input, &mut samples); + last_peak.store(peak, Ordering::Relaxed); + send_realtime_audio_chunk(&tx, samples, sample_rate, channels); + }, + move |err| error!("audio input error: {err}"), + None, + ) + .map_err(|e| format!("failed to build input stream: {e}")), + _ => Err("unsupported input sample format".to_string()), + } +} + +fn send_realtime_audio_chunk( + tx: &AppEventSender, + samples: Vec, + sample_rate: u32, + channels: u16, +) { + if samples.is_empty() || sample_rate == 0 || channels == 0 { + return; + } + + let mut bytes = Vec::with_capacity(samples.len() * 2); + for sample in &samples { + bytes.extend_from_slice(&sample.to_le_bytes()); + } + + let encoded = base64::engine::general_purpose::STANDARD.encode(bytes); + let samples_per_channel = (samples.len() / usize::from(channels)) as u32; + + tx.send(AppEvent::CodexOp(Op::RealtimeConversationAudio( + ConversationAudioParams { + frame: RealtimeAudioFrame { + data: encoded, + sample_rate, + num_channels: channels, + samples_per_channel: Some(samples_per_channel), + }, + }, + ))); +} + #[inline] fn f32_abs_to_u16(x: f32) -> u16 { let peak_u = (x.abs().min(1.0) * i16::MAX as f32) as i32; @@ -338,6 +454,207 @@ fn convert_u16_to_i16_and_peak(input: &[u16], out: &mut Vec) -> u16 { peak as u16 } +// ------------------------- +// Realtime audio playback helpers +// ------------------------- + +pub(crate) struct RealtimeAudioPlayer { + _stream: cpal::Stream, + queue: Arc>>, + output_sample_rate: u32, + output_channels: u16, +} + +impl RealtimeAudioPlayer { + pub(crate) fn start() -> Result { + let host = cpal::default_host(); + let device = host + .default_output_device() + .ok_or_else(|| "no output audio device available".to_string())?; + let config = device + .default_output_config() + .map_err(|e| format!("failed to get default output config: {e}"))?; + let output_sample_rate = config.sample_rate().0; + let output_channels = config.channels(); + let queue = Arc::new(Mutex::new(VecDeque::new())); + let stream = build_output_stream(&device, &config, Arc::clone(&queue))?; + stream + .play() + .map_err(|e| format!("failed to start output stream: {e}"))?; + Ok(Self { + _stream: stream, + queue, + output_sample_rate, + output_channels, + }) + } + + pub(crate) fn enqueue_frame(&self, frame: &RealtimeAudioFrame) -> Result<(), String> { + if frame.num_channels == 0 || frame.sample_rate == 0 { + return Err("invalid realtime audio frame format".to_string()); + } + let raw_bytes = base64::engine::general_purpose::STANDARD + .decode(&frame.data) + .map_err(|e| format!("failed to decode realtime audio: {e}"))?; + if raw_bytes.len() % 2 != 0 { + return Err("realtime audio frame had odd byte length".to_string()); + } + let mut pcm = Vec::with_capacity(raw_bytes.len() / 2); + for pair in raw_bytes.chunks_exact(2) { + pcm.push(i16::from_le_bytes([pair[0], pair[1]])); + } + let converted = convert_pcm16_for_output( + &pcm, + frame.sample_rate, + frame.num_channels, + self.output_sample_rate, + self.output_channels, + ); + if converted.is_empty() { + return Ok(()); + } + let mut guard = self + .queue + .lock() + .map_err(|_| "failed to lock output audio queue".to_string())?; + // TODO(aibrahim): Cap or trim this queue if we observe producer bursts outrunning playback. + guard.extend(converted); + Ok(()) + } + + pub(crate) fn clear(&self) { + if let Ok(mut guard) = self.queue.lock() { + guard.clear(); + } + } +} + +fn build_output_stream( + device: &cpal::Device, + config: &cpal::SupportedStreamConfig, + queue: Arc>>, +) -> Result { + let config_any: cpal::StreamConfig = config.clone().into(); + match config.sample_format() { + cpal::SampleFormat::F32 => device + .build_output_stream( + &config_any, + move |output: &mut [f32], _| fill_output_f32(output, &queue), + move |err| error!("audio output error: {err}"), + None, + ) + .map_err(|e| format!("failed to build f32 output stream: {e}")), + cpal::SampleFormat::I16 => device + .build_output_stream( + &config_any, + move |output: &mut [i16], _| fill_output_i16(output, &queue), + move |err| error!("audio output error: {err}"), + None, + ) + .map_err(|e| format!("failed to build i16 output stream: {e}")), + cpal::SampleFormat::U16 => device + .build_output_stream( + &config_any, + move |output: &mut [u16], _| fill_output_u16(output, &queue), + move |err| error!("audio output error: {err}"), + None, + ) + .map_err(|e| format!("failed to build u16 output stream: {e}")), + other => Err(format!("unsupported output sample format: {other:?}")), + } +} + +fn fill_output_i16(output: &mut [i16], queue: &Arc>>) { + if let Ok(mut guard) = queue.lock() { + for sample in output { + *sample = guard.pop_front().unwrap_or(0); + } + return; + } + output.fill(0); +} + +fn fill_output_f32(output: &mut [f32], queue: &Arc>>) { + if let Ok(mut guard) = queue.lock() { + for sample in output { + let v = guard.pop_front().unwrap_or(0); + *sample = (v as f32) / (i16::MAX as f32); + } + return; + } + output.fill(0.0); +} + +fn fill_output_u16(output: &mut [u16], queue: &Arc>>) { + if let Ok(mut guard) = queue.lock() { + for sample in output { + let v = guard.pop_front().unwrap_or(0); + *sample = (v as i32 + 32768).clamp(0, u16::MAX as i32) as u16; + } + return; + } + output.fill(32768); +} + +fn convert_pcm16_for_output( + input: &[i16], + input_sample_rate: u32, + input_channels: u16, + output_sample_rate: u32, + output_channels: u16, +) -> Vec { + if input.is_empty() || input_channels == 0 || output_channels == 0 { + return Vec::new(); + } + + let in_channels = input_channels as usize; + let out_channels = output_channels as usize; + let in_frames = input.len() / in_channels; + if in_frames == 0 { + return Vec::new(); + } + + let out_frames = if input_sample_rate == output_sample_rate { + in_frames + } else { + (((in_frames as u64) * (output_sample_rate as u64)) / (input_sample_rate as u64)).max(1) + as usize + }; + + let mut out = Vec::with_capacity(out_frames.saturating_mul(out_channels)); + for out_frame_idx in 0..out_frames { + let src_frame_idx = if out_frames <= 1 || in_frames <= 1 { + 0 + } else { + ((out_frame_idx as u64) * ((in_frames - 1) as u64) / ((out_frames - 1) as u64)) as usize + }; + let src_start = src_frame_idx.saturating_mul(in_channels); + let src = &input[src_start..src_start + in_channels]; + match (in_channels, out_channels) { + (1, 1) => out.push(src[0]), + (1, n) => { + for _ in 0..n { + out.push(src[0]); + } + } + (n, 1) if n >= 2 => { + let sum: i32 = src.iter().map(|s| *s as i32).sum(); + out.push((sum / (n as i32)) as i16); + } + (n, m) if n == m => out.extend_from_slice(src), + (n, m) if n > m => out.extend_from_slice(&src[..m]), + (n, m) => { + out.extend_from_slice(src); + let last = *src.last().unwrap_or(&0); + for _ in n..m { + out.push(last); + } + } + } + } + out +} + // ------------------------- // Transcription helpers // -------------------------