Add realtime v2 event parser behind feature flag (#14537)

- Add a feature-flagged realtime v2 parser on the existing
websocket/session pipeline.
- Wire parser selection from core feature flags and map the codex
handoff tool-call path into existing handoff events.

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Ahmed Ibrahim 2026-03-12 21:12:40 -07:00 committed by GitHub
parent 650beb177e
commit 3e8f47169e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 352 additions and 10 deletions

View file

@ -2,6 +2,7 @@ use crate::endpoint::realtime_websocket::protocol::ConversationItem;
use crate::endpoint::realtime_websocket::protocol::ConversationItemContent;
use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame;
use crate::endpoint::realtime_websocket::protocol::RealtimeEvent;
use crate::endpoint::realtime_websocket::protocol::RealtimeEventParser;
use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage;
use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig;
use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta;
@ -202,6 +203,7 @@ pub struct RealtimeWebsocketWriter {
pub struct RealtimeWebsocketEvents {
rx_message: Arc<Mutex<mpsc::UnboundedReceiver<Result<Message, WsError>>>>,
active_transcript: Arc<Mutex<ActiveTranscriptState>>,
event_parser: RealtimeEventParser,
is_closed: Arc<AtomicBool>,
}
@ -248,6 +250,7 @@ impl RealtimeWebsocketConnection {
fn new(
stream: WsStream,
rx_message: mpsc::UnboundedReceiver<Result<Message, WsError>>,
event_parser: RealtimeEventParser,
) -> Self {
let stream = Arc::new(stream);
let is_closed = Arc::new(AtomicBool::new(false));
@ -259,6 +262,7 @@ impl RealtimeWebsocketConnection {
events: RealtimeWebsocketEvents {
rx_message: Arc::new(Mutex::new(rx_message)),
active_transcript: Arc::new(Mutex::new(ActiveTranscriptState::default())),
event_parser,
is_closed,
},
}
@ -376,7 +380,7 @@ impl RealtimeWebsocketEvents {
match msg {
Message::Text(text) => {
if let Some(mut event) = parse_realtime_event(&text) {
if let Some(mut event) = parse_realtime_event(&text, self.event_parser) {
self.update_active_transcript(&mut event).await;
debug!(?event, "realtime websocket parsed event");
return Ok(Some(event));
@ -495,7 +499,7 @@ impl RealtimeWebsocketClient {
);
let (stream, rx_message) = WsStream::new(stream);
let connection = RealtimeWebsocketConnection::new(stream, rx_message);
let connection = RealtimeWebsocketConnection::new(stream, rx_message, config.event_parser);
debug!(
session_id = config.session_id.as_deref().unwrap_or("<none>"),
"realtime websocket sending session.update"
@ -636,7 +640,7 @@ mod tests {
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str()),
parse_realtime_event(payload.as_str(), RealtimeEventParser::V1),
Some(RealtimeEvent::SessionUpdated {
session_id: "sess_123".to_string(),
instructions: Some("backend prompt".to_string()),
@ -655,7 +659,7 @@ mod tests {
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str()),
parse_realtime_event(payload.as_str(), RealtimeEventParser::V1),
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
data: "AAA=".to_string(),
sample_rate: 48000,
@ -673,7 +677,7 @@ mod tests {
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str()),
parse_realtime_event(payload.as_str(), RealtimeEventParser::V1),
Some(RealtimeEvent::ConversationItemAdded(
json!({"type": "message", "seq": 7})
))
@ -688,7 +692,7 @@ mod tests {
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str()),
parse_realtime_event(payload.as_str(), RealtimeEventParser::V1),
Some(RealtimeEvent::ConversationItemDone {
item_id: "item_123".to_string(),
})
@ -706,7 +710,7 @@ mod tests {
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str()),
parse_realtime_event(payload.as_str(), RealtimeEventParser::V1),
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id: "handoff_123".to_string(),
item_id: "item_123".to_string(),
@ -725,7 +729,7 @@ mod tests {
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str()),
parse_realtime_event(payload.as_str(), RealtimeEventParser::V1),
Some(RealtimeEvent::InputTranscriptDelta(
RealtimeTranscriptDelta {
delta: "hello ".to_string(),
@ -743,7 +747,7 @@ mod tests {
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str()),
parse_realtime_event(payload.as_str(), RealtimeEventParser::V1),
Some(RealtimeEvent::OutputTranscriptDelta(
RealtimeTranscriptDelta {
delta: "hi".to_string(),
@ -752,6 +756,68 @@ mod tests {
);
}
#[test]
fn parse_realtime_v2_handoff_tool_call_event() {
let payload = json!({
"type": "conversation.item.done",
"item": {
"id": "item_123",
"type": "function_call",
"name": "codex",
"call_id": "call_123",
"arguments": "{\"prompt\":\"delegate this\"}"
}
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2),
Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id: "call_123".to_string(),
item_id: "item_123".to_string(),
input_transcript: "delegate this".to_string(),
active_transcript: Vec::new(),
}))
);
}
#[test]
fn parse_realtime_v2_input_audio_transcription_delta_event() {
let payload = json!({
"type": "conversation.item.input_audio_transcription.delta",
"delta": "hello"
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2),
Some(RealtimeEvent::InputTranscriptDelta(
RealtimeTranscriptDelta {
delta: "hello".to_string(),
}
))
);
}
#[test]
fn parse_realtime_v2_output_audio_delta_defaults_audio_shape() {
let payload = json!({
"type": "response.output_audio.delta",
"delta": "AQID"
})
.to_string();
assert_eq!(
parse_realtime_event(payload.as_str(), RealtimeEventParser::RealtimeV2),
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
data: "AQID".to_string(),
sample_rate: 24_000,
num_channels: 1,
samples_per_channel: None,
}))
);
}
#[test]
fn merge_request_headers_matches_http_precedence() {
let mut provider_headers = HeaderMap::new();
@ -1008,6 +1074,7 @@ mod tests {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::V1,
},
HeaderMap::new(),
HeaderMap::new(),
@ -1190,6 +1257,7 @@ mod tests {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_1".to_string()),
event_parser: RealtimeEventParser::V1,
},
HeaderMap::new(),
HeaderMap::new(),

View file

@ -1,5 +1,6 @@
pub mod methods;
pub mod protocol;
mod protocol_v2;
pub use codex_protocol::protocol::RealtimeAudioFrame;
pub use codex_protocol::protocol::RealtimeEvent;
@ -7,4 +8,5 @@ pub use methods::RealtimeWebsocketClient;
pub use methods::RealtimeWebsocketConnection;
pub use methods::RealtimeWebsocketEvents;
pub use methods::RealtimeWebsocketWriter;
pub use protocol::RealtimeEventParser;
pub use protocol::RealtimeSessionConfig;

View file

@ -1,3 +1,4 @@
use crate::endpoint::realtime_websocket::protocol_v2::parse_realtime_event_v2;
pub use codex_protocol::protocol::RealtimeAudioFrame;
pub use codex_protocol::protocol::RealtimeEvent;
pub use codex_protocol::protocol::RealtimeHandoffRequested;
@ -7,11 +8,18 @@ use serde::Serialize;
use serde_json::Value;
use tracing::debug;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RealtimeEventParser {
V1,
RealtimeV2,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RealtimeSessionConfig {
pub instructions: String,
pub model: Option<String>,
pub session_id: Option<String>,
pub event_parser: RealtimeEventParser,
}
#[derive(Debug, Clone, Serialize)]
@ -76,7 +84,17 @@ pub(super) struct ConversationItemContent {
pub(super) text: String,
}
pub(super) fn parse_realtime_event(payload: &str) -> Option<RealtimeEvent> {
pub(super) fn parse_realtime_event(
payload: &str,
event_parser: RealtimeEventParser,
) -> Option<RealtimeEvent> {
match event_parser {
RealtimeEventParser::V1 => parse_realtime_event_v1(payload),
RealtimeEventParser::RealtimeV2 => parse_realtime_event_v2(payload),
}
}
fn parse_realtime_event_v1(payload: &str) -> Option<RealtimeEvent> {
let parsed: Value = match serde_json::from_str(payload) {
Ok(msg) => msg,
Err(err) => {

View file

@ -0,0 +1,157 @@
use codex_protocol::protocol::RealtimeAudioFrame;
use codex_protocol::protocol::RealtimeEvent;
use codex_protocol::protocol::RealtimeHandoffRequested;
use codex_protocol::protocol::RealtimeTranscriptDelta;
use serde_json::Value;
use tracing::debug;
pub(super) fn parse_realtime_event_v2(payload: &str) -> Option<RealtimeEvent> {
let parsed: Value = match serde_json::from_str(payload) {
Ok(msg) => msg,
Err(err) => {
debug!("failed to parse realtime v2 event: {err}, data: {payload}");
return None;
}
};
let message_type = match parsed.get("type").and_then(Value::as_str) {
Some(message_type) => message_type,
None => {
debug!("received realtime v2 event without type field: {payload}");
return None;
}
};
match message_type {
"session.updated" => {
let session_id = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("id"))
.and_then(Value::as_str)
.map(str::to_string);
let instructions = parsed
.get("session")
.and_then(Value::as_object)
.and_then(|session| session.get("instructions"))
.and_then(Value::as_str)
.map(str::to_string);
session_id.map(|session_id| RealtimeEvent::SessionUpdated {
session_id,
instructions,
})
}
"response.output_audio.delta" => {
let data = parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)?;
let sample_rate = parsed
.get("sample_rate")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok())
.unwrap_or(24_000);
let num_channels = parsed
.get("channels")
.or_else(|| parsed.get("num_channels"))
.and_then(Value::as_u64)
.and_then(|value| u16::try_from(value).ok())
.unwrap_or(1);
Some(RealtimeEvent::AudioOut(RealtimeAudioFrame {
data,
sample_rate,
num_channels,
samples_per_channel: parsed
.get("samples_per_channel")
.and_then(Value::as_u64)
.and_then(|value| u32::try_from(value).ok()),
}))
}
"conversation.item.input_audio_transcription.delta" => parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"conversation.item.input_audio_transcription.completed" => parsed
.get("transcript")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"response.output_text.delta" | "response.output_audio_transcript.delta" => parsed
.get("delta")
.and_then(Value::as_str)
.map(str::to_string)
.map(|delta| RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta })),
"conversation.item.added" => parsed
.get("item")
.cloned()
.map(RealtimeEvent::ConversationItemAdded),
"conversation.item.done" => {
let item = parsed.get("item")?.as_object()?;
let item_type = item.get("type").and_then(Value::as_str);
let item_name = item.get("name").and_then(Value::as_str);
if item_type == Some("function_call") && item_name == Some("codex") {
let call_id = item
.get("call_id")
.and_then(Value::as_str)
.or_else(|| item.get("id").and_then(Value::as_str))?;
let item_id = item
.get("id")
.and_then(Value::as_str)
.unwrap_or(call_id)
.to_string();
let arguments = item.get("arguments").and_then(Value::as_str).unwrap_or("");
let mut input_transcript = String::new();
if !arguments.is_empty() {
if let Ok(arguments_json) = serde_json::from_str::<Value>(arguments)
&& let Some(arguments_object) = arguments_json.as_object()
{
for key in ["input_transcript", "input", "text", "prompt", "query"] {
if let Some(value) = arguments_object.get(key).and_then(Value::as_str) {
let trimmed = value.trim();
if !trimmed.is_empty() {
input_transcript = trimmed.to_string();
break;
}
}
}
}
if input_transcript.is_empty() {
input_transcript = arguments.to_string();
}
}
return Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id: call_id.to_string(),
item_id,
input_transcript,
active_transcript: Vec::new(),
}));
}
item.get("id")
.and_then(Value::as_str)
.map(str::to_string)
.map(|item_id| RealtimeEvent::ConversationItemDone { item_id })
}
"error" => parsed
.get("message")
.and_then(Value::as_str)
.map(str::to_string)
.or_else(|| {
parsed
.get("error")
.and_then(Value::as_object)
.and_then(|error| error.get("message"))
.and_then(Value::as_str)
.map(str::to_string)
})
.or_else(|| parsed.get("error").map(ToString::to_string))
.map(RealtimeEvent::Error),
_ => {
debug!("received unsupported realtime v2 event type: {message_type}, data: {payload}");
None
}
}
}

View file

@ -27,6 +27,7 @@ pub use crate::common::create_text_param_for_request;
pub use crate::endpoint::compact::CompactClient;
pub use crate::endpoint::memories::MemoriesClient;
pub use crate::endpoint::models::ModelsClient;
pub use crate::endpoint::realtime_websocket::RealtimeEventParser;
pub use crate::endpoint::realtime_websocket::RealtimeSessionConfig;
pub use crate::endpoint::realtime_websocket::RealtimeWebsocketClient;
pub use crate::endpoint::realtime_websocket::RealtimeWebsocketConnection;

View file

@ -4,10 +4,12 @@ use std::time::Duration;
use codex_api::RealtimeAudioFrame;
use codex_api::RealtimeEvent;
use codex_api::RealtimeEventParser;
use codex_api::RealtimeSessionConfig;
use codex_api::RealtimeWebsocketClient;
use codex_api::provider::Provider;
use codex_api::provider::RetryConfig;
use codex_protocol::protocol::RealtimeHandoffRequested;
use futures::SinkExt;
use futures::StreamExt;
use http::HeaderMap;
@ -139,6 +141,7 @@ async fn realtime_ws_e2e_session_create_and_event_flow() {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::V1,
},
HeaderMap::new(),
HeaderMap::new(),
@ -231,6 +234,7 @@ async fn realtime_ws_e2e_send_while_next_event_waits() {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::V1,
},
HeaderMap::new(),
HeaderMap::new(),
@ -294,6 +298,7 @@ async fn realtime_ws_e2e_disconnected_emitted_once() {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::V1,
},
HeaderMap::new(),
HeaderMap::new(),
@ -354,6 +359,7 @@ async fn realtime_ws_e2e_ignores_unknown_text_events() {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::V1,
},
HeaderMap::new(),
HeaderMap::new(),
@ -377,3 +383,69 @@ async fn realtime_ws_e2e_ignores_unknown_text_events() {
connection.close().await.expect("close");
server.await.expect("server task");
}
#[tokio::test]
async fn realtime_ws_e2e_realtime_v2_parser_emits_handoff_requested() {
let (addr, server) = spawn_realtime_ws_server(|mut ws: RealtimeWsStream| async move {
let first = ws
.next()
.await
.expect("first msg")
.expect("first msg ok")
.into_text()
.expect("text");
let first_json: Value = serde_json::from_str(&first).expect("json");
assert_eq!(first_json["type"], "session.update");
ws.send(Message::Text(
json!({
"type": "conversation.item.done",
"item": {
"id": "item_123",
"type": "function_call",
"name": "codex",
"call_id": "call_123",
"arguments": "{\"prompt\":\"delegate now\"}"
}
})
.to_string()
.into(),
))
.await
.expect("send function call");
})
.await;
let client = RealtimeWebsocketClient::new(test_provider(format!("http://{addr}")));
let connection = client
.connect(
RealtimeSessionConfig {
instructions: "backend prompt".to_string(),
model: Some("realtime-test-model".to_string()),
session_id: Some("conv_123".to_string()),
event_parser: RealtimeEventParser::RealtimeV2,
},
HeaderMap::new(),
HeaderMap::new(),
)
.await
.expect("connect");
let event = connection
.next_event()
.await
.expect("next event")
.expect("event");
assert_eq!(
event,
RealtimeEvent::HandoffRequested(RealtimeHandoffRequested {
handoff_id: "call_123".to_string(),
item_id: "item_123".to_string(),
input_transcript: "delegate now".to_string(),
active_transcript: Vec::new(),
})
);
connection.close().await.expect("close");
server.await.expect("server task");
}

View file

@ -426,6 +426,9 @@
"realtime_conversation": {
"type": "boolean"
},
"realtime_conversation_v2": {
"type": "boolean"
},
"remote_models": {
"type": "boolean"
},
@ -1937,6 +1940,9 @@
"realtime_conversation": {
"type": "boolean"
},
"realtime_conversation_v2": {
"type": "boolean"
},
"remote_models": {
"type": "boolean"
},

View file

@ -180,6 +180,8 @@ pub enum Feature {
VoiceTranscription,
/// Enable experimental realtime voice conversation mode in the TUI.
RealtimeConversation,
/// Route realtime conversations through the v2 event parser.
RealtimeConversationV2,
/// Prevent idle system sleep while a turn is actively running.
PreventIdleSleep,
/// Use the Responses API WebSocket transport for OpenAI by default.
@ -823,6 +825,12 @@ pub const FEATURES: &[FeatureSpec] = &[
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::RealtimeConversationV2,
key: "realtime_conversation_v2",
stage: Stage::UnderDevelopment,
default_enabled: false,
},
FeatureSpec {
id: Feature::PreventIdleSleep,
key: "prevent_idle_sleep",

View file

@ -5,6 +5,7 @@ use crate::codex::Session;
use crate::default_client::default_headers;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::realtime_context::build_realtime_startup_context;
use async_channel::Receiver;
use async_channel::Sender;
@ -12,6 +13,7 @@ use async_channel::TrySendError;
use codex_api::Provider as ApiProvider;
use codex_api::RealtimeAudioFrame;
use codex_api::RealtimeEvent;
use codex_api::RealtimeEventParser;
use codex_api::RealtimeSessionConfig;
use codex_api::RealtimeWebsocketClient;
use codex_api::endpoint::realtime_websocket::RealtimeWebsocketEvents;
@ -117,6 +119,7 @@ impl RealtimeConversationManager {
prompt: String,
model: Option<String>,
session_id: Option<String>,
event_parser: RealtimeEventParser,
) -> CodexResult<(Receiver<RealtimeEvent>, Arc<AtomicBool>)> {
let previous_state = {
let mut guard = self.state.lock().await;
@ -132,6 +135,7 @@ impl RealtimeConversationManager {
instructions: prompt,
model,
session_id,
event_parser,
};
let client = RealtimeWebsocketClient::new(api_provider);
let connection = client
@ -298,6 +302,11 @@ pub(crate) async fn handle_start(
format!("{prompt}\n\n{startup_context}")
};
let model = config.experimental_realtime_ws_model.clone();
let event_parser = if config.features.enabled(Feature::RealtimeConversationV2) {
RealtimeEventParser::RealtimeV2
} else {
RealtimeEventParser::V1
};
let requested_session_id = params
.session_id
@ -313,6 +322,7 @@ pub(crate) async fn handle_start(
prompt,
model,
requested_session_id.clone(),
event_parser,
)
.await
{