diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index 141f57d9d..84ba80a82 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -2,6 +2,7 @@ use crate::endpoint::realtime_websocket::protocol::ConversationItem; use crate::endpoint::realtime_websocket::protocol::ConversationItemContent; use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame; use crate::endpoint::realtime_websocket::protocol::RealtimeEvent; +use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser; use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage; use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig; use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta; @@ -202,6 +203,7 @@ pub struct RealtimeWebsocketWriter { pub struct RealtimeWebsocketEvents { rx_message: Arc>>>, active_transcript: Arc>, + event_parser: RealtimeEventParser, is_closed: Arc, } @@ -248,6 +250,7 @@ impl RealtimeWebsocketConnection { fn new( stream: WsStream, rx_message: mpsc::UnboundedReceiver>, + event_parser: RealtimeEventParser, ) -> Self { let stream = Arc::new(stream); let is_closed = Arc::new(AtomicBool::new(false)); @@ -259,6 +262,7 @@ impl RealtimeWebsocketConnection { events: RealtimeWebsocketEvents { rx_message: Arc::new(Mutex::new(rx_message)), active_transcript: Arc::new(Mutex::new(ActiveTranscriptState::default())), + event_parser, is_closed, }, } @@ -376,7 +380,7 @@ impl RealtimeWebsocketEvents { match msg { Message::Text(text) => { - if let Some(mut event) = parse_realtime_event(&text) { + if let Some(mut event) = parse_realtime_event(&text, self.event_parser) { self.update_active_transcript(&mut event).await; debug!(?event, "realtime websocket parsed event"); return Ok(Some(event)); @@ -495,7 +499,7 @@ impl RealtimeWebsocketClient { ); let (stream, rx_message) = WsStream::new(stream); - let connection = RealtimeWebsocketConnection::new(stream, rx_message); + let connection = RealtimeWebsocketConnection::new(stream, rx_message, config.event_parser); debug!( session_id = config.session_id.as_deref().unwrap_or(""), "realtime websocket sending session.update" @@ -636,7 +640,7 @@ mod tests { .to_string(); assert_eq!( - parse_realtime_event(payload.as_str()), + parse_realtime_event(payload.as_str(), RealtimeEventParser::V1), Some(RealtimeEvent::SessionUpdated { session_id: "sess_123".to_string(), instructions: Some("backend prompt".to_string()), @@ -655,7 +659,7 @@ mod tests { }) .to_string(); assert_eq!( - parse_realtime_event(payload.as_str()), + parse_realtime_event(payload.as_str(), RealtimeEventParser::V1), Some(RealtimeEvent::AudioOut(RealtimeAudioFrame { data: "AAA=".to_string(), sample_rate: 48000, @@ -673,7 +677,7 @@ mod tests { }) .to_string(); assert_eq!( - parse_realtime_event(payload.as_str()), + parse_realtime_event(payload.as_str(), RealtimeEventParser::V1), Some(RealtimeEvent::ConversationItemAdded( json!({"type": "message", "seq": 7}) )) @@ -688,7 +692,7 @@ mod tests { }) .to_string(); assert_eq!( - parse_realtime_event(payload.as_str()), + parse_realtime_event(payload.as_str(), RealtimeEventParser::V1), Some(RealtimeEvent::ConversationItemDone { item_id: "item_123".to_string(), }) @@ -706,7 +710,7 @@ mod tests { .to_string(); assert_eq!( - parse_realtime_event(payload.as_str()), + parse_realtime_event(payload.as_str(), RealtimeEventParser::V1), Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested { handoff_id: "handoff_123".to_string(), item_id: "item_123".to_string(), @@ -725,7 +729,7 @@ mod tests { .to_string(); assert_eq!( - parse_realtime_event(payload.as_str()), + parse_realtime_event(payload.as_str(), RealtimeEventParser::V1), Some(RealtimeEvent::InputTranscriptDelta( RealtimeTranscriptDelta { delta: "hello ".to_string(), @@ -743,7 +747,7 @@ mod tests { .to_string(); assert_eq!( - parse_realtime_event(payload.as_str()), + parse_realtime_event(payload.as_str(), RealtimeEventParser::V1), Some(RealtimeEvent::OutputTranscriptDelta( RealtimeTranscriptDelta { delta: "hi".to_string(), @@ -752,6 +756,68 @@ mod tests { ); } + #[test] + fn parse_realtime_v2_handoff_tool_call_event() { + let payload = json!({ + "type": "conversation.item.done", + "item": { + "id": "item_123", + "type": "function_call", + "name": "codex", + "call_id": "call_123", + "arguments": "{\"prompt\":\"delegate this\"}" + } + }) + .to_string(); + + assert_eq!( + parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2), + Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested { + handoff_id: "call_123".to_string(), + item_id: "item_123".to_string(), + input_transcript: "delegate this".to_string(), + active_transcript: Vec::new(), + })) + ); + } + + #[test] + fn parse_realtime_v2_input_audio_transcription_delta_event() { + let payload = json!({ + "type": "conversation.item.input_audio_transcription.delta", + "delta": "hello" + }) + .to_string(); + + assert_eq!( + parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2), + Some(RealtimeEvent::InputTranscriptDelta( + RealtimeTranscriptDelta { + delta: "hello".to_string(), + } + )) + ); + } + + #[test] + fn parse_realtime_v2_output_audio_delta_defaults_audio_shape() { + let payload = json!({ + "type": "response.output_audio.delta", + "delta": "AQID" + }) + .to_string(); + + assert_eq!( + parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2), + Some(RealtimeEvent::AudioOut(RealtimeAudioFrame { + data: "AQID".to_string(), + sample_rate: 24_000, + num_channels: 1, + samples_per_channel: None, + })) + ); + } + #[test] fn merge_request_headers_matches_http_precedence() { let mut provider_headers = HeaderMap::new(); @@ -1008,6 +1074,7 @@ mod tests { instructions: "backend prompt".to_string(), model: Some("realtime-test-model".to_string()), session_id: Some("conv_1".to_string()), + event_parser: RealtimeEventParser::V1, }, HeaderMap::new(), HeaderMap::new(), @@ -1190,6 +1257,7 @@ mod tests { instructions: "backend prompt".to_string(), model: Some("realtime-test-model".to_string()), session_id: Some("conv_1".to_string()), + event_parser: RealtimeEventParser::V1, }, HeaderMap::new(), HeaderMap::new(), diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/mod.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/mod.rs index a89dbd3e7..3428e73c5 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/mod.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/mod.rs @@ -1,5 +1,6 @@ pub mod methods; pub mod protocol; +mod protocol_v2; pub use codex_protocol::protocol::RealtimeAudioFrame; pub use codex_protocol::protocol::RealtimeEvent; @@ -7,4 +8,5 @@ pub use methods::RealtimeWebsocketClient; pub use methods::RealtimeWebsocketConnection; pub use methods::RealtimeWebsocketEvents; pub use methods::RealtimeWebsocketWriter; +pub use protocol::RealtimeEventParser; pub use protocol::RealtimeSessionConfig; diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs index 7967d5999..afe29ab8e 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs @@ -1,3 +1,4 @@ +use crate::endpoint::realtime_websocket::protocol_v2::parse_realtime_event_v2; pub use codex_protocol::protocol::RealtimeAudioFrame; pub use codex_protocol::protocol::RealtimeEvent; pub use codex_protocol::protocol::RealtimeHandoffRequested; @@ -7,11 +8,18 @@ use serde::Serialize; use serde_json::Value; use tracing::debug; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RealtimeEventParser { + V1, + RealtimeV2, +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct RealtimeSessionConfig { pub instructions: String, pub model: Option, pub session_id: Option, + pub event_parser: RealtimeEventParser, } #[derive(Debug, Clone, Serialize)] @@ -76,7 +84,17 @@ pub(super) struct ConversationItemContent { pub(super) text: String, } -pub(super) fn parse_realtime_event(payload: &str) -> Option { +pub(super) fn parse_realtime_event( + payload: &str, + event_parser: RealtimeEventParser, +) -> Option { + match event_parser { + RealtimeEventParser::V1 => parse_realtime_event_v1(payload), + RealtimeEventParser::RealtimeV2 => parse_realtime_event_v2(payload), + } +} + +fn parse_realtime_event_v1(payload: &str) -> Option { let parsed: Value = match serde_json::from_str(payload) { Ok(msg) => msg, Err(err) => { diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs new file mode 100644 index 000000000..fd9d39abb --- /dev/null +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol_v2.rs @@ -0,0 +1,157 @@ +use codex_protocol::protocol::RealtimeAudioFrame; +use codex_protocol::protocol::RealtimeEvent; +use codex_protocol::protocol::RealtimeHandoffRequested; +use codex_protocol::protocol::RealtimeTranscriptDelta; +use serde_json::Value; +use tracing::debug; + +pub(super) fn parse_realtime_event_v2(payload: &str) -> Option { + let parsed: Value = match serde_json::from_str(payload) { + Ok(msg) => msg, + Err(err) => { + debug!("failed to parse realtime v2 event: {err}, data: {payload}"); + return None; + } + }; + + let message_type = match parsed.get("type").and_then(Value::as_str) { + Some(message_type) => message_type, + None => { + debug!("received realtime v2 event without type field: {payload}"); + return None; + } + }; + + match message_type { + "session.updated" => { + let session_id = parsed + .get("session") + .and_then(Value::as_object) + .and_then(|session| session.get("id")) + .and_then(Value::as_str) + .map(str::to_string); + let instructions = parsed + .get("session") + .and_then(Value::as_object) + .and_then(|session| session.get("instructions")) + .and_then(Value::as_str) + .map(str::to_string); + session_id.map(|session_id| RealtimeEvent::SessionUpdated { + session_id, + instructions, + }) + } + "response.output_audio.delta" => { + let data = parsed + .get("delta") + .and_then(Value::as_str) + .map(str::to_string)?; + let sample_rate = parsed + .get("sample_rate") + .and_then(Value::as_u64) + .and_then(|value| u32::try_from(value).ok()) + .unwrap_or(24_000); + let num_channels = parsed + .get("channels") + .or_else(|| parsed.get("num_channels")) + .and_then(Value::as_u64) + .and_then(|value| u16::try_from(value).ok()) + .unwrap_or(1); + Some(RealtimeEvent::AudioOut(RealtimeAudioFrame { + data, + sample_rate, + num_channels, + samples_per_channel: parsed + .get("samples_per_channel") + .and_then(Value::as_u64) + .and_then(|value| u32::try_from(value).ok()), + })) + } + "conversation.item.input_audio_transcription.delta" => parsed + .get("delta") + .and_then(Value::as_str) + .map(str::to_string) + .map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })), + "conversation.item.input_audio_transcription.completed" => parsed + .get("transcript") + .and_then(Value::as_str) + .map(str::to_string) + .map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })), + "response.output_text.delta" | "response.output_audio_transcript.delta" => parsed + .get("delta") + .and_then(Value::as_str) + .map(str::to_string) + .map(|delta| RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta })), + "conversation.item.added" => parsed + .get("item") + .cloned() + .map(RealtimeEvent::ConversationItemAdded), + "conversation.item.done" => { + let item = parsed.get("item")?.as_object()?; + let item_type = item.get("type").and_then(Value::as_str); + let item_name = item.get("name").and_then(Value::as_str); + + if item_type == Some("function_call") && item_name == Some("codex") { + let call_id = item + .get("call_id") + .and_then(Value::as_str) + .or_else(|| item.get("id").and_then(Value::as_str))?; + let item_id = item + .get("id") + .and_then(Value::as_str) + .unwrap_or(call_id) + .to_string(); + let arguments = item.get("arguments").and_then(Value::as_str).unwrap_or(""); + let mut input_transcript = String::new(); + if !arguments.is_empty() { + if let Ok(arguments_json) = serde_json::from_str::(arguments) + && let Some(arguments_object) = arguments_json.as_object() + { + for key in ["input_transcript", "input", "text", "prompt", "query"] { + if let Some(value) = arguments_object.get(key).and_then(Value::as_str) { + let trimmed = value.trim(); + if !trimmed.is_empty() { + input_transcript = trimmed.to_string(); + break; + } + } + } + } + if input_transcript.is_empty() { + input_transcript = arguments.to_string(); + } + } + + return Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested { + handoff_id: call_id.to_string(), + item_id, + input_transcript, + active_transcript: Vec::new(), + })); + } + + item.get("id") + .and_then(Value::as_str) + .map(str::to_string) + .map(|item_id| RealtimeEvent::ConversationItemDone { item_id }) + } + "error" => parsed + .get("message") + .and_then(Value::as_str) + .map(str::to_string) + .or_else(|| { + parsed + .get("error") + .and_then(Value::as_object) + .and_then(|error| error.get("message")) + .and_then(Value::as_str) + .map(str::to_string) + }) + .or_else(|| parsed.get("error").map(ToString::to_string)) + .map(RealtimeEvent::Error), + _ => { + debug!("received unsupported realtime v2 event type: {message_type}, data: {payload}"); + None + } + } +} diff --git a/codex-rs/codex-api/src/lib.rs b/codex-rs/codex-api/src/lib.rs index 138929602..35ae983b9 100644 --- a/codex-rs/codex-api/src/lib.rs +++ b/codex-rs/codex-api/src/lib.rs @@ -27,6 +27,7 @@ pub use crate::common::create_text_param_for_request; pub use crate::endpoint::compact::CompactClient; pub use crate::endpoint::memories::MemoriesClient; pub use crate::endpoint::models::ModelsClient; +pub use crate::endpoint::realtime_websocket::RealtimeEventParser; pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig; pub use crate::endpoint::realtime_websocket::RealtimeWebsocketClient; pub use crate::endpoint::realtime_websocket::RealtimeWebsocketConnection; diff --git a/codex-rs/codex-api/tests/realtime_websocket_e2e.rs b/codex-rs/codex-api/tests/realtime_websocket_e2e.rs index aa11b7958..d6d73c0f0 100644 --- a/codex-rs/codex-api/tests/realtime_websocket_e2e.rs +++ b/codex-rs/codex-api/tests/realtime_websocket_e2e.rs @@ -4,10 +4,12 @@ use std::time::Duration; use codex_api::RealtimeAudioFrame; use codex_api::RealtimeEvent; +use codex_api::RealtimeEventParser; use codex_api::RealtimeSessionConfig; use codex_api::RealtimeWebsocketClient; use codex_api::provider::Provider; use codex_api::provider::RetryConfig; +use codex_protocol::protocol::RealtimeHandoffRequested; use futures::SinkExt; use futures::StreamExt; use http::HeaderMap; @@ -139,6 +141,7 @@ async fn realtime_ws_e2e_session_create_and_event_flow() { instructions: "backend prompt".to_string(), model: Some("realtime-test-model".to_string()), session_id: Some("conv_123".to_string()), + event_parser: RealtimeEventParser::V1, }, HeaderMap::new(), HeaderMap::new(), @@ -231,6 +234,7 @@ async fn realtime_ws_e2e_send_while_next_event_waits() { instructions: "backend prompt".to_string(), model: Some("realtime-test-model".to_string()), session_id: Some("conv_123".to_string()), + event_parser: RealtimeEventParser::V1, }, HeaderMap::new(), HeaderMap::new(), @@ -294,6 +298,7 @@ async fn realtime_ws_e2e_disconnected_emitted_once() { instructions: "backend prompt".to_string(), model: Some("realtime-test-model".to_string()), session_id: Some("conv_123".to_string()), + event_parser: RealtimeEventParser::V1, }, HeaderMap::new(), HeaderMap::new(), @@ -354,6 +359,7 @@ async fn realtime_ws_e2e_ignores_unknown_text_events() { instructions: "backend prompt".to_string(), model: Some("realtime-test-model".to_string()), session_id: Some("conv_123".to_string()), + event_parser: RealtimeEventParser::V1, }, HeaderMap::new(), HeaderMap::new(), @@ -377,3 +383,69 @@ async fn realtime_ws_e2e_ignores_unknown_text_events() { connection.close().await.expect("close"); server.await.expect("server task"); } + +#[tokio::test] +async fn realtime_ws_e2e_realtime_v2_parser_emits_handoff_requested() { + let (addr, server) = spawn_realtime_ws_server(|mut ws: RealtimeWsStream| async move { + let first = ws + .next() + .await + .expect("first msg") + .expect("first msg ok") + .into_text() + .expect("text"); + let first_json: Value = serde_json::from_str(&first).expect("json"); + assert_eq!(first_json["type"], "session.update"); + + ws.send(Message::Text( + json!({ + "type": "conversation.item.done", + "item": { + "id": "item_123", + "type": "function_call", + "name": "codex", + "call_id": "call_123", + "arguments": "{\"prompt\":\"delegate now\"}" + } + }) + .to_string() + .into(), + )) + .await + .expect("send function call"); + }) + .await; + + let client = RealtimeWebsocketClient::new(test_provider(format!("http://{addr}"))); + let connection = client + .connect( + RealtimeSessionConfig { + instructions: "backend prompt".to_string(), + model: Some("realtime-test-model".to_string()), + session_id: Some("conv_123".to_string()), + event_parser: RealtimeEventParser::RealtimeV2, + }, + HeaderMap::new(), + HeaderMap::new(), + ) + .await + .expect("connect"); + + let event = connection + .next_event() + .await + .expect("next event") + .expect("event"); + assert_eq!( + event, + RealtimeEvent::HandoffRequested(RealtimeHandoffRequested { + handoff_id: "call_123".to_string(), + item_id: "item_123".to_string(), + input_transcript: "delegate now".to_string(), + active_transcript: Vec::new(), + }) + ); + + connection.close().await.expect("close"); + server.await.expect("server task"); +} diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index 5a0353b5d..3b2f07f85 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -426,6 +426,9 @@ "realtime_conversation": { "type": "boolean" }, + "realtime_conversation_v2": { + "type": "boolean" + }, "remote_models": { "type": "boolean" }, @@ -1937,6 +1940,9 @@ "realtime_conversation": { "type": "boolean" }, + "realtime_conversation_v2": { + "type": "boolean" + }, "remote_models": { "type": "boolean" }, diff --git a/codex-rs/core/src/features.rs b/codex-rs/core/src/features.rs index d0e4e290b..8072add4a 100644 --- a/codex-rs/core/src/features.rs +++ b/codex-rs/core/src/features.rs @@ -180,6 +180,8 @@ pub enum Feature { VoiceTranscription, /// Enable experimental realtime voice conversation mode in the TUI. RealtimeConversation, + /// Route realtime conversations through the v2 event parser. + RealtimeConversationV2, /// Prevent idle system sleep while a turn is actively running. PreventIdleSleep, /// Use the Responses API WebSocket transport for OpenAI by default. @@ -823,6 +825,12 @@ pub const FEATURES: &[FeatureSpec] = &[ stage: Stage::UnderDevelopment, default_enabled: false, }, + FeatureSpec { + id: Feature::RealtimeConversationV2, + key: "realtime_conversation_v2", + stage: Stage::UnderDevelopment, + default_enabled: false, + }, FeatureSpec { id: Feature::PreventIdleSleep, key: "prevent_idle_sleep", diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index f1ce8398e..bc822a9bf 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -5,6 +5,7 @@ use crate::codex::Session; use crate::default_client::default_headers; use crate::error::CodexErr; use crate::error::Result as CodexResult; +use crate::features::Feature; use crate::realtime_context::build_realtime_startup_context; use async_channel::Receiver; use async_channel::Sender; @@ -12,6 +13,7 @@ use async_channel::TrySendError; use codex_api::Provider as ApiProvider; use codex_api::RealtimeAudioFrame; use codex_api::RealtimeEvent; +use codex_api::RealtimeEventParser; use codex_api::RealtimeSessionConfig; use codex_api::RealtimeWebsocketClient; use codex_api::endpoint::realtime_websocket::RealtimeWebsocketEvents; @@ -117,6 +119,7 @@ impl RealtimeConversationManager { prompt: String, model: Option, session_id: Option, + event_parser: RealtimeEventParser, ) -> CodexResult<(Receiver, Arc)> { let previous_state = { let mut guard = self.state.lock().await; @@ -132,6 +135,7 @@ impl RealtimeConversationManager { instructions: prompt, model, session_id, + event_parser, }; let client = RealtimeWebsocketClient::new(api_provider); let connection = client @@ -298,6 +302,11 @@ pub(crate) async fn handle_start( format!("{prompt}\n\n{startup_context}") }; let model = config.experimental_realtime_ws_model.clone(); + let event_parser = if config.features.enabled(Feature::RealtimeConversationV2) { + RealtimeEventParser::RealtimeV2 + } else { + RealtimeEventParser::V1 + }; let requested_session_id = params .session_id @@ -313,6 +322,7 @@ pub(crate) async fn handle_start( prompt, model, requested_session_id.clone(), + event_parser, ) .await {