From b20b6aa46fa54297eb52d73db475b5901ed76062 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 2 Mar 2026 16:05:40 -0800 Subject: [PATCH] Update realtime websocket API (#13265) - migrate the realtime websocket transport to the new session and handoff flow - make the realtime model configurable in config.toml and use API-key auth for the websocket --------- Co-authored-by: Codex --- .../schema/json/EventMsg.json | 104 +++- .../codex_app_server_protocol.schemas.json | 104 +++- .../schema/typescript/RealtimeEvent.ts | 3 +- .../typescript/RealtimeHandoffMessage.ts | 5 + .../typescript/RealtimeHandoffRequested.ts | 6 + .../schema/typescript/index.ts | 2 + .../app-server/src/bespoke_event_handling.rs | 19 +- .../tests/suite/v2/realtime_conversation.rs | 38 +- .../endpoint/realtime_websocket/methods.rs | 395 +++++++++--- .../endpoint/realtime_websocket/protocol.rs | 135 ++-- .../codex-api/tests/realtime_websocket_e2e.rs | 75 ++- codex-rs/core/config.schema.json | 8 +- codex-rs/core/src/codex.rs | 15 +- codex-rs/core/src/config/mod.rs | 53 +- codex-rs/core/src/realtime_conversation.rs | 356 ++++++++--- codex-rs/core/tests/common/responses.rs | 13 +- codex-rs/core/tests/common/test_codex.rs | 1 + codex-rs/core/tests/suite/compact_remote.rs | 8 +- .../core/tests/suite/realtime_conversation.rs | 587 +++++++++++++----- codex-rs/protocol/src/protocol.rs | 24 +- codex-rs/tui/src/chatwidget/realtime.rs | 5 +- 21 files changed, 1449 insertions(+), 507 deletions(-) create mode 100644 codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffMessage.ts create mode 100644 codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffRequested.ts diff --git a/codex-rs/app-server-protocol/schema/json/EventMsg.json b/codex-rs/app-server-protocol/schema/json/EventMsg.json index b3e431706..200135f25 100644 --- a/codex-rs/app-server-protocol/schema/json/EventMsg.json +++ b/codex-rs/app-server-protocol/schema/json/EventMsg.json @@ -4234,8 +4234,14 @@ { "additionalProperties": false, "properties": { - "SessionCreated": { + "SessionUpdated": { "properties": { + "instructions": { + "type": [ + "string", + "null" + ] + }, "session_id": { "type": "string" } @@ -4246,27 +4252,6 @@ "type": "object" } }, - "required": [ - "SessionCreated" - ], - "title": "SessionCreatedRealtimeEvent", - "type": "object" - }, - { - "additionalProperties": false, - "properties": { - "SessionUpdated": { - "properties": { - "backend_prompt": { - "type": [ - "string", - "null" - ] - } - }, - "type": "object" - } - }, "required": [ "SessionUpdated" ], @@ -4297,6 +4282,40 @@ "title": "ConversationItemAddedRealtimeEvent", "type": "object" }, + { + "additionalProperties": false, + "properties": { + "ConversationItemDone": { + "properties": { + "item_id": { + "type": "string" + } + }, + "required": [ + "item_id" + ], + "type": "object" + } + }, + "required": [ + "ConversationItemDone" + ], + "title": "ConversationItemDoneRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "HandoffRequested": { + "$ref": "#/definitions/RealtimeHandoffRequested" + } + }, + "required": [ + "HandoffRequested" + ], + "title": "HandoffRequestedRealtimeEvent", + "type": "object" + }, { "additionalProperties": false, "properties": { @@ -4312,6 +4331,47 @@ } ] }, + "RealtimeHandoffMessage": { + "properties": { + "role": { + "type": "string" + }, + "text": { + "type": "string" + } + }, + "required": [ + "role", + "text" + ], + "type": "object" + }, + "RealtimeHandoffRequested": { + "properties": { + "handoff_id": { + "type": "string" + }, + "input_transcript": { + "type": "string" + }, + "item_id": { + "type": "string" + }, + "messages": { + "items": { + "$ref": "#/definitions/RealtimeHandoffMessage" + }, + "type": "array" + } + }, + "required": [ + "handoff_id", + "input_transcript", + "item_id", + "messages" + ], + "type": "object" + }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 275e0dd94..694f8312f 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -5454,8 +5454,14 @@ { "additionalProperties": false, "properties": { - "SessionCreated": { + "SessionUpdated": { "properties": { + "instructions": { + "type": [ + "string", + "null" + ] + }, "session_id": { "type": "string" } @@ -5466,27 +5472,6 @@ "type": "object" } }, - "required": [ - "SessionCreated" - ], - "title": "SessionCreatedRealtimeEvent", - "type": "object" - }, - { - "additionalProperties": false, - "properties": { - "SessionUpdated": { - "properties": { - "backend_prompt": { - "type": [ - "string", - "null" - ] - } - }, - "type": "object" - } - }, "required": [ "SessionUpdated" ], @@ -5517,6 +5502,40 @@ "title": "ConversationItemAddedRealtimeEvent", "type": "object" }, + { + "additionalProperties": false, + "properties": { + "ConversationItemDone": { + "properties": { + "item_id": { + "type": "string" + } + }, + "required": [ + "item_id" + ], + "type": "object" + } + }, + "required": [ + "ConversationItemDone" + ], + "title": "ConversationItemDoneRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "HandoffRequested": { + "$ref": "#/definitions/RealtimeHandoffRequested" + } + }, + "required": [ + "HandoffRequested" + ], + "title": "HandoffRequestedRealtimeEvent", + "type": "object" + }, { "additionalProperties": false, "properties": { @@ -5532,6 +5551,47 @@ } ] }, + "RealtimeHandoffMessage": { + "properties": { + "role": { + "type": "string" + }, + "text": { + "type": "string" + } + }, + "required": [ + "role", + "text" + ], + "type": "object" + }, + "RealtimeHandoffRequested": { + "properties": { + "handoff_id": { + "type": "string" + }, + "input_transcript": { + "type": "string" + }, + "item_id": { + "type": "string" + }, + "messages": { + "items": { + "$ref": "#/definitions/RealtimeHandoffMessage" + }, + "type": "array" + } + }, + "required": [ + "handoff_id", + "input_transcript", + "item_id", + "messages" + ], + "type": "object" + }, "RejectConfig": { "properties": { "mcp_elicitations": { diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeEvent.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeEvent.ts index 6297beebe..8286f34ca 100644 --- a/codex-rs/app-server-protocol/schema/typescript/RealtimeEvent.ts +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeEvent.ts @@ -2,6 +2,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { RealtimeAudioFrame } from "./RealtimeAudioFrame"; +import type { RealtimeHandoffRequested } from "./RealtimeHandoffRequested"; import type { JsonValue } from "./serde_json/JsonValue"; -export type RealtimeEvent = { "SessionCreated": { session_id: string, } } | { "SessionUpdated": { backend_prompt: string | null, } } | { "AudioOut": RealtimeAudioFrame } | { "ConversationItemAdded": JsonValue } | { "Error": string }; +export type RealtimeEvent = { "SessionUpdated": { session_id: string, instructions: string | null, } } | { "AudioOut": RealtimeAudioFrame } | { "ConversationItemAdded": JsonValue } | { "ConversationItemDone": { item_id: string, } } | { "HandoffRequested": RealtimeHandoffRequested } | { "Error": string }; diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffMessage.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffMessage.ts new file mode 100644 index 000000000..39b77cfdf --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffMessage.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type RealtimeHandoffMessage = { role: string, text: string, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffRequested.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffRequested.ts new file mode 100644 index 000000000..f3426706a --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffRequested.ts @@ -0,0 +1,6 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { RealtimeHandoffMessage } from "./RealtimeHandoffMessage"; + +export type RealtimeHandoffRequested = { handoff_id: string, item_id: string, input_transcript: string, messages: Array, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/index.ts b/codex-rs/app-server-protocol/schema/typescript/index.ts index bc5b0297b..c72f24999 100644 --- a/codex-rs/app-server-protocol/schema/typescript/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/index.ts @@ -161,6 +161,8 @@ export type { RealtimeConversationClosedEvent } from "./RealtimeConversationClos export type { RealtimeConversationRealtimeEvent } from "./RealtimeConversationRealtimeEvent"; export type { RealtimeConversationStartedEvent } from "./RealtimeConversationStartedEvent"; export type { RealtimeEvent } from "./RealtimeEvent"; +export type { RealtimeHandoffMessage } from "./RealtimeHandoffMessage"; +export type { RealtimeHandoffRequested } from "./RealtimeHandoffRequested"; export type { ReasoningContentDeltaEvent } from "./ReasoningContentDeltaEvent"; export type { ReasoningEffort } from "./ReasoningEffort"; export type { ReasoningItem } from "./ReasoningItem"; diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 35725e7a8..f23a1ec09 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -232,7 +232,6 @@ pub(crate) async fn apply_bespoke_event_handling( EventMsg::RealtimeConversationRealtime(event) => { if let ApiVersion::V2 = api_version { match event.payload { - RealtimeEvent::SessionCreated { .. } => {} RealtimeEvent::SessionUpdated { .. } => {} RealtimeEvent::AudioOut(audio) => { let notification = ThreadRealtimeOutputAudioDeltaNotification { @@ -256,6 +255,24 @@ pub(crate) async fn apply_bespoke_event_handling( )) .await; } + RealtimeEvent::ConversationItemDone { .. } => {} + RealtimeEvent::HandoffRequested(handoff) => { + let notification = ThreadRealtimeItemAddedNotification { + thread_id: conversation_id.to_string(), + item: serde_json::json!({ + "type": "handoff_request", + "handoff_id": handoff.handoff_id, + "item_id": handoff.item_id, + "input_transcript": handoff.input_transcript, + "messages": handoff.messages, + }), + }; + outgoing + .send_server_notification(ServerNotification::ThreadRealtimeItemAdded( + notification, + )) + .await; + } RealtimeEvent::Error(message) => { let notification = ThreadRealtimeErrorNotification { thread_id: conversation_id.to_string(), diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index f2664848c..139e8b378 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -5,6 +5,7 @@ use app_test_support::create_mock_responses_server_sequence_unchecked; use app_test_support::to_response; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::LoginApiKeyParams; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ThreadRealtimeAppendAudioParams; use codex_app_server_protocol::ThreadRealtimeAppendAudioResponse; @@ -42,20 +43,17 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { let responses_server = create_mock_responses_server_sequence_unchecked(Vec::new()).await; let realtime_server = start_websocket_server(vec![vec![ - vec![json!({ - "type": "session.created", - "session": { "id": "sess_backend" } - })], vec![json!({ "type": "session.updated", - "session": { "backend_prompt": "backend prompt" } + "session": { "id": "sess_backend", "instructions": "backend prompt" } })], + vec![], vec![ json!({ - "type": "response.output_audio.delta", + "type": "conversation.output_audio.delta", "delta": "AQID", "sample_rate": 24_000, - "num_channels": 1, + "channels": 1, "samples_per_channel": 512 }), json!({ @@ -84,6 +82,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { let mut mcp = McpProcess::new(codex_home.path()).await?; mcp.initialize().await?; + login_with_api_key(&mut mcp, "sk-test-key").await?; let thread_start_request_id = mcp .send_thread_start_request(ThreadStartParams::default()) @@ -182,7 +181,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { assert_eq!(connection.len(), 3); assert_eq!( connection[0].body_json()["type"].as_str(), - Some("session.create") + Some("session.update") ); let mut request_types = [ connection[1].body_json()["type"] @@ -199,7 +198,7 @@ async fn realtime_conversation_streams_v2_notifications() -> Result<()> { request_types, [ "conversation.item.create".to_string(), - "response.input_audio.delta".to_string(), + "input_audio_buffer.append".to_string(), ] ); @@ -214,8 +213,8 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> { let responses_server = create_mock_responses_server_sequence_unchecked(Vec::new()).await; let realtime_server = start_websocket_server(vec![vec![ vec![json!({ - "type": "session.created", - "session": { "id": "sess_backend" } + "type": "session.updated", + "session": { "id": "sess_backend", "instructions": "backend prompt" } })], vec![], ]]) @@ -231,6 +230,7 @@ async fn realtime_conversation_stop_emits_closed_notification() -> Result<()> { let mut mcp = McpProcess::new(codex_home.path()).await?; mcp.initialize().await?; + login_with_api_key(&mut mcp, "sk-test-key").await?; let thread_start_request_id = mcp .send_thread_start_request(ThreadStartParams::default()) @@ -349,6 +349,22 @@ async fn read_notification(mcp: &mut McpProcess, method: &s Ok(serde_json::from_value(params)?) } +async fn login_with_api_key(mcp: &mut McpProcess, api_key: &str) -> Result<()> { + let request_id = mcp + .send_login_api_key_request(LoginApiKeyParams { + api_key: api_key.to_string(), + }) + .await?; + + timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + + Ok(()) +} + fn create_config_toml( codex_home: &Path, responses_server_uri: &str, diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index 97d0b9e32..cb6583ded 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -4,7 +4,10 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame; use crate::endpoint::realtime_websocket::protocol::RealtimeEvent; use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage; use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig; -use crate::endpoint::realtime_websocket::protocol::SessionCreateSession; +use crate::endpoint::realtime_websocket::protocol::SessionAudio; +use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat; +use crate::endpoint::realtime_websocket::protocol::SessionAudioInput; +use crate::endpoint::realtime_websocket::protocol::SessionAudioOutput; use crate::endpoint::realtime_websocket::protocol::SessionUpdateSession; use crate::endpoint::realtime_websocket::protocol::parse_realtime_event; use crate::error::ApiError; @@ -13,6 +16,8 @@ use codex_utils_rustls_provider::ensure_rustls_crypto_provider; use futures::SinkExt; use futures::StreamExt; use http::HeaderMap; +use http::HeaderValue; +use std::collections::HashMap; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -205,23 +210,13 @@ impl RealtimeWebsocketConnection { self.writer.send_conversation_item_create(text).await } - pub async fn send_session_update( + pub async fn send_conversation_handoff_append( &self, - backend_prompt: String, - conversation_id: Option, + handoff_id: String, + output_text: String, ) -> Result<(), ApiError> { self.writer - .send_session_update(backend_prompt, conversation_id) - .await - } - - pub async fn send_session_create( - &self, - backend_prompt: String, - conversation_id: Option, - ) -> Result<(), ApiError> { - self.writer - .send_session_create(backend_prompt, conversation_id) + .send_conversation_handoff_append(handoff_id, output_text) .await } @@ -262,13 +257,8 @@ impl RealtimeWebsocketConnection { impl RealtimeWebsocketWriter { pub async fn send_audio_frame(&self, frame: RealtimeAudioFrame) -> Result<(), ApiError> { - self.send_json(RealtimeOutboundMessage::InputAudioDelta { - delta: frame.data, - sample_rate: frame.sample_rate, - num_channels: frame.num_channels, - samples_per_channel: frame.samples_per_channel, - }) - .await + self.send_json(RealtimeOutboundMessage::InputAudioBufferAppend { audio: frame.data }) + .await } pub async fn send_conversation_item_create(&self, text: String) -> Result<(), ApiError> { @@ -285,29 +275,34 @@ impl RealtimeWebsocketWriter { .await } - pub async fn send_session_update( + pub async fn send_conversation_handoff_append( &self, - backend_prompt: String, - conversation_id: Option, + handoff_id: String, + output_text: String, ) -> Result<(), ApiError> { - self.send_json(RealtimeOutboundMessage::SessionUpdate { - session: Some(SessionUpdateSession { - backend_prompt, - conversation_id, - }), + self.send_json(RealtimeOutboundMessage::ConversationHandoffAppend { + handoff_id, + output_text, }) .await } - pub async fn send_session_create( - &self, - backend_prompt: String, - conversation_id: Option, - ) -> Result<(), ApiError> { - self.send_json(RealtimeOutboundMessage::SessionCreate { - session: SessionCreateSession { - backend_prompt, - conversation_id, + pub async fn send_session_update(&self, instructions: String) -> Result<(), ApiError> { + self.send_json(RealtimeOutboundMessage::SessionUpdate { + session: SessionUpdateSession { + kind: "quicksilver".to_string(), + instructions, + audio: SessionAudio { + input: SessionAudioInput { + format: SessionAudioFormat { + kind: "audio/pcm".to_string(), + rate: 24_000, + }, + }, + output: SessionAudioOutput { + voice: "mundo".to_string(), + }, + }, }, }) .await @@ -413,14 +408,21 @@ impl RealtimeWebsocketClient { default_headers: HeaderMap, ) -> Result { ensure_rustls_crypto_provider(); - // Keep provider base_url semantics aligned with HTTP clients; derive the ws endpoint here. - let ws_url = websocket_url_from_api_url(self.provider.base_url.as_str())?; + let ws_url = websocket_url_from_api_url( + self.provider.base_url.as_str(), + self.provider.query_params.as_ref(), + config.model.as_deref(), + )?; let mut request = ws_url .as_str() .into_client_request() .map_err(|err| ApiError::Stream(format!("failed to build websocket request: {err}")))?; - let headers = merge_request_headers(&self.provider.headers, extra_headers, default_headers); + let headers = merge_request_headers( + &self.provider.headers, + with_session_id_header(extra_headers, config.session_id.as_deref())?, + default_headers, + ); request.headers_mut().extend(headers); info!("connecting realtime websocket: {ws_url}"); @@ -439,11 +441,12 @@ impl RealtimeWebsocketClient { let (stream, rx_message) = WsStream::new(stream); let connection = RealtimeWebsocketConnection::new(stream, rx_message); debug!( - conversation_id = config.session_id.as_deref().unwrap_or(""), - "realtime websocket sending session.create" + session_id = config.session_id.as_deref().unwrap_or(""), + "realtime websocket sending session.update" ); connection - .send_session_create(config.prompt, config.session_id) + .writer + .send_session_update(config.instructions) .await?; Ok(connection) } @@ -464,38 +467,99 @@ fn merge_request_headers( headers } +fn with_session_id_header( + mut headers: HeaderMap, + session_id: Option<&str>, +) -> Result { + let Some(session_id) = session_id else { + return Ok(headers); + }; + headers.insert( + "x-session-id", + HeaderValue::from_str(session_id).map_err(|err| { + ApiError::Stream(format!("invalid realtime session id header: {err}")) + })?, + ); + Ok(headers) +} + fn websocket_config() -> WebSocketConfig { WebSocketConfig::default() } -fn websocket_url_from_api_url(api_url: &str) -> Result { +fn websocket_url_from_api_url( + api_url: &str, + query_params: Option<&HashMap>, + model: Option<&str>, +) -> Result { let mut url = Url::parse(api_url) .map_err(|err| ApiError::Stream(format!("failed to parse realtime api_url: {err}")))?; + normalize_realtime_path(&mut url); + match url.scheme() { - "ws" | "wss" => { - if url.path().is_empty() || url.path() == "/" { - url.set_path("/ws"); - } - Ok(url) - } + "ws" | "wss" => {} "http" | "https" => { - if url.path().is_empty() || url.path() == "/" { - url.set_path("/ws"); - } let scheme = if url.scheme() == "http" { "ws" } else { "wss" }; let _ = url.set_scheme(scheme); - Ok(url) } - scheme => Err(ApiError::Stream(format!( - "unsupported realtime api_url scheme: {scheme}" - ))), + scheme => { + return Err(ApiError::Stream(format!( + "unsupported realtime api_url scheme: {scheme}" + ))); + } + } + + { + let mut query = url.query_pairs_mut(); + query.append_pair("intent", "quicksilver"); + if let Some(model) = model { + query.append_pair("model", model); + } + if let Some(query_params) = query_params { + for (key, value) in query_params { + if key == "intent" || (key == "model" && model.is_some()) { + continue; + } + query.append_pair(key, value); + } + } + } + + Ok(url) +} + +fn normalize_realtime_path(url: &mut Url) { + let path = url.path().to_string(); + if path.is_empty() || path == "/" { + url.set_path("/v1/realtime"); + return; + } + + if path.ends_with("/realtime") { + return; + } + + if path.ends_with("/realtime/") { + url.set_path(path.trim_end_matches('/')); + return; + } + + if path.ends_with("/v1") { + url.set_path(&format!("{path}/realtime")); + return; + } + + if path.ends_with("/v1/") { + url.set_path(&format!("{path}realtime")); } } #[cfg(test)] mod tests { use super::*; + use crate::endpoint::realtime_websocket::protocol::RealtimeHandoffMessage; + use crate::endpoint::realtime_websocket::protocol::RealtimeHandoffRequested; use http::HeaderValue; use pretty_assertions::assert_eq; use serde_json::Value; @@ -507,17 +571,18 @@ mod tests { use tokio_tungstenite::tungstenite::Message; #[test] - fn parse_session_created_event() { + fn parse_session_updated_event() { let payload = json!({ - "type": "session.created", - "session": {"id": "sess_123"} + "type": "session.updated", + "session": {"id": "sess_123", "instructions": "backend prompt"} }) .to_string(); assert_eq!( parse_realtime_event(payload.as_str()), - Some(RealtimeEvent::SessionCreated { - session_id: "sess_123".to_string() + Some(RealtimeEvent::SessionUpdated { + session_id: "sess_123".to_string(), + instructions: Some("backend prompt".to_string()), }) ); } @@ -525,10 +590,10 @@ mod tests { #[test] fn parse_audio_delta_event() { let payload = json!({ - "type": "response.output_audio.delta", + "type": "conversation.output_audio.delta", "delta": "AAA=", "sample_rate": 48000, - "num_channels": 1, + "channels": 1, "samples_per_channel": 960 }) .to_string(); @@ -547,17 +612,59 @@ mod tests { fn parse_conversation_item_added_event() { let payload = json!({ "type": "conversation.item.added", - "item": {"type": "spawn_transcript", "seq": 7} + "item": {"type": "message", "seq": 7} }) .to_string(); assert_eq!( parse_realtime_event(payload.as_str()), Some(RealtimeEvent::ConversationItemAdded( - json!({"type": "spawn_transcript", "seq": 7}) + json!({"type": "message", "seq": 7}) )) ); } + #[test] + fn parse_conversation_item_done_event() { + let payload = json!({ + "type": "conversation.item.done", + "item": {"id": "item_123", "type": "message"} + }) + .to_string(); + assert_eq!( + parse_realtime_event(payload.as_str()), + Some(RealtimeEvent::ConversationItemDone { + item_id: "item_123".to_string(), + }) + ); + } + + #[test] + fn parse_handoff_requested_event() { + let payload = json!({ + "type": "conversation.handoff.requested", + "handoff_id": "handoff_123", + "item_id": "item_123", + "input_transcript": "delegate this", + "messages": [ + {"role": "user", "text": "delegate this"} + ] + }) + .to_string(); + + assert_eq!( + parse_realtime_event(payload.as_str()), + Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested { + handoff_id: "handoff_123".to_string(), + item_id: "item_123".to_string(), + input_transcript: "delegate this".to_string(), + messages: vec![RealtimeHandoffMessage { + role: "user".to_string(), + text: "delegate this".to_string(), + }], + })) + ); + } + #[test] fn merge_request_headers_matches_http_precedence() { let mut provider_headers = HeaderMap::new(); @@ -593,14 +700,61 @@ mod tests { #[test] fn websocket_url_from_http_base_defaults_to_ws_path() { - let url = websocket_url_from_api_url("http://127.0.0.1:8011").expect("build ws url"); - assert_eq!(url.as_str(), "ws://127.0.0.1:8011/ws"); + let url = + websocket_url_from_api_url("http://127.0.0.1:8011", None, None).expect("build ws url"); + assert_eq!( + url.as_str(), + "ws://127.0.0.1:8011/v1/realtime?intent=quicksilver" + ); } #[test] fn websocket_url_from_ws_base_defaults_to_ws_path() { - let url = websocket_url_from_api_url("wss://example.com").expect("build ws url"); - assert_eq!(url.as_str(), "wss://example.com/ws"); + let url = + websocket_url_from_api_url("wss://example.com", None, Some("realtime-test-model")) + .expect("build ws url"); + assert_eq!( + url.as_str(), + "wss://example.com/v1/realtime?intent=quicksilver&model=realtime-test-model" + ); + } + + #[test] + fn websocket_url_from_v1_base_appends_realtime_path() { + let url = websocket_url_from_api_url("https://api.openai.com/v1", None, Some("snapshot")) + .expect("build ws url"); + assert_eq!( + url.as_str(), + "wss://api.openai.com/v1/realtime?intent=quicksilver&model=snapshot" + ); + } + + #[test] + fn websocket_url_from_nested_v1_base_appends_realtime_path() { + let url = + websocket_url_from_api_url("https://example.com/openai/v1", None, Some("snapshot")) + .expect("build ws url"); + assert_eq!( + url.as_str(), + "wss://example.com/openai/v1/realtime?intent=quicksilver&model=snapshot" + ); + } + + #[test] + fn websocket_url_preserves_existing_realtime_path_and_extra_query_params() { + let url = websocket_url_from_api_url( + "https://example.com/v1/realtime?foo=bar", + Some(&HashMap::from([ + ("trace".to_string(), "1".to_string()), + ("intent".to_string(), "ignored".to_string()), + ])), + Some("snapshot"), + ) + .expect("build ws url"); + assert_eq!( + url.as_str(), + "wss://example.com/v1/realtime?foo=bar&intent=quicksilver&model=snapshot&trace=1" + ); } #[tokio::test] @@ -620,26 +774,38 @@ mod tests { .into_text() .expect("text"); let first_json: Value = serde_json::from_str(&first).expect("json"); - assert_eq!(first_json["type"], "session.create"); + assert_eq!(first_json["type"], "session.update"); assert_eq!( - first_json["session"]["backend_prompt"], + first_json["session"]["type"], + Value::String("quicksilver".to_string()) + ); + assert_eq!( + first_json["session"]["instructions"], Value::String("backend prompt".to_string()) ); assert_eq!( - first_json["session"]["conversation_id"], - Value::String("conv_1".to_string()) + first_json["session"]["audio"]["input"]["format"]["type"], + Value::String("audio/pcm".to_string()) + ); + assert_eq!( + first_json["session"]["audio"]["input"]["format"]["rate"], + Value::from(24_000) + ); + assert_eq!( + first_json["session"]["audio"]["output"]["voice"], + Value::String("mundo".to_string()) ); ws.send(Message::Text( json!({ - "type": "session.created", - "session": {"id": "sess_mock"} + "type": "session.updated", + "session": {"id": "sess_mock", "instructions": "backend prompt"} }) .to_string() .into(), )) .await - .expect("send session.created"); + .expect("send session.updated"); let second = ws .next() @@ -649,7 +815,7 @@ mod tests { .into_text() .expect("text"); let second_json: Value = serde_json::from_str(&second).expect("json"); - assert_eq!(second_json["type"], "response.input_audio.delta"); + assert_eq!(second_json["type"], "input_audio_buffer.append"); let third = ws .next() @@ -662,12 +828,24 @@ mod tests { assert_eq!(third_json["type"], "conversation.item.create"); assert_eq!(third_json["item"]["content"][0]["text"], "hello agent"); + let fourth = ws + .next() + .await + .expect("fourth msg") + .expect("fourth msg ok") + .into_text() + .expect("text"); + let fourth_json: Value = serde_json::from_str(&fourth).expect("json"); + assert_eq!(fourth_json["type"], "conversation.handoff.append"); + assert_eq!(fourth_json["handoff_id"], "handoff_1"); + assert_eq!(fourth_json["output_text"], "hello from codex"); + ws.send(Message::Text( json!({ - "type": "response.output_audio.delta", + "type": "conversation.output_audio.delta", "delta": "AQID", "sample_rate": 48000, - "num_channels": 1 + "channels": 1 }) .to_string() .into(), @@ -677,8 +855,11 @@ mod tests { ws.send(Message::Text( json!({ - "type": "conversation.item.added", - "item": {"type": "spawn_transcript", "seq": 2} + "type": "conversation.handoff.requested", + "handoff_id": "handoff_1", + "item_id": "item_2", + "input_transcript": "delegate now", + "messages": [{"role": "user", "text": "delegate now"}] }) .to_string() .into(), @@ -705,7 +886,8 @@ mod tests { let connection = client .connect( RealtimeSessionConfig { - prompt: "backend prompt".to_string(), + instructions: "backend prompt".to_string(), + model: Some("realtime-test-model".to_string()), session_id: Some("conv_1".to_string()), }, HeaderMap::new(), @@ -721,8 +903,9 @@ mod tests { .expect("event"); assert_eq!( created, - RealtimeEvent::SessionCreated { - session_id: "sess_mock".to_string() + RealtimeEvent::SessionUpdated { + session_id: "sess_mock".to_string(), + instructions: Some("backend prompt".to_string()), } ); @@ -739,6 +922,13 @@ mod tests { .send_conversation_item_create("hello agent".to_string()) .await .expect("send item"); + connection + .send_conversation_handoff_append( + "handoff_1".to_string(), + "hello from codex".to_string(), + ) + .await + .expect("send handoff"); let audio_event = connection .next_event() @@ -762,10 +952,15 @@ mod tests { .expect("event"); assert_eq!( added_event, - RealtimeEvent::ConversationItemAdded(json!({ - "type": "spawn_transcript", - "seq": 2 - })) + RealtimeEvent::HandoffRequested(RealtimeHandoffRequested { + handoff_id: "handoff_1".to_string(), + item_id: "item_2".to_string(), + input_transcript: "delegate now".to_string(), + messages: vec![RealtimeHandoffMessage { + role: "user".to_string(), + text: "delegate now".to_string(), + }], + }) ); connection.close().await.expect("close"); @@ -789,7 +984,7 @@ mod tests { .into_text() .expect("text"); let first_json: Value = serde_json::from_str(&first).expect("json"); - assert_eq!(first_json["type"], "session.create"); + assert_eq!(first_json["type"], "session.update"); let second = ws .next() @@ -799,18 +994,18 @@ mod tests { .into_text() .expect("text"); let second_json: Value = serde_json::from_str(&second).expect("json"); - assert_eq!(second_json["type"], "response.input_audio.delta"); + assert_eq!(second_json["type"], "input_audio_buffer.append"); ws.send(Message::Text( json!({ - "type": "session.created", - "session": {"id": "sess_after_send"} + "type": "session.updated", + "session": {"id": "sess_after_send", "instructions": "backend prompt"} }) .to_string() .into(), )) .await - .expect("send session.created"); + .expect("send session.updated"); }); let provider = Provider { @@ -831,7 +1026,8 @@ mod tests { let connection = client .connect( RealtimeSessionConfig { - prompt: "backend prompt".to_string(), + instructions: "backend prompt".to_string(), + model: Some("realtime-test-model".to_string()), session_id: Some("conv_1".to_string()), }, HeaderMap::new(), @@ -862,8 +1058,9 @@ mod tests { let next_event = next_result.expect("next event").expect("event"); assert_eq!( next_event, - RealtimeEvent::SessionCreated { - session_id: "sess_after_send".to_string() + RealtimeEvent::SessionUpdated { + session_id: "sess_after_send".to_string(), + instructions: Some("backend prompt".to_string()), } ); diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs index f2f0616fc..82957c5ec 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs @@ -1,49 +1,63 @@ pub use codex_protocol::protocol::RealtimeAudioFrame; pub use codex_protocol::protocol::RealtimeEvent; +pub use codex_protocol::protocol::RealtimeHandoffMessage; +pub use codex_protocol::protocol::RealtimeHandoffRequested; use serde::Serialize; use serde_json::Value; use tracing::debug; #[derive(Debug, Clone, PartialEq, Eq)] pub struct RealtimeSessionConfig { - pub prompt: String, + pub instructions: String, + pub model: Option, pub session_id: Option, } #[derive(Debug, Clone, Serialize)] #[serde(tag = "type")] pub(super) enum RealtimeOutboundMessage { - #[serde(rename = "response.input_audio.delta")] - InputAudioDelta { - delta: String, - sample_rate: u32, - num_channels: u16, - #[serde(skip_serializing_if = "Option::is_none")] - samples_per_channel: Option, + #[serde(rename = "input_audio_buffer.append")] + InputAudioBufferAppend { audio: String }, + #[serde(rename = "conversation.handoff.append")] + ConversationHandoffAppend { + handoff_id: String, + output_text: String, }, - #[serde(rename = "session.create")] - SessionCreate { session: SessionCreateSession }, #[serde(rename = "session.update")] - SessionUpdate { - #[serde(skip_serializing_if = "Option::is_none")] - session: Option, - }, + SessionUpdate { session: SessionUpdateSession }, #[serde(rename = "conversation.item.create")] ConversationItemCreate { item: ConversationItem }, } #[derive(Debug, Clone, Serialize)] pub(super) struct SessionUpdateSession { - pub(super) backend_prompt: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub(super) conversation_id: Option, + #[serde(rename = "type")] + pub(super) kind: String, + pub(super) instructions: String, + pub(super) audio: SessionAudio, } #[derive(Debug, Clone, Serialize)] -pub(super) struct SessionCreateSession { - pub(super) backend_prompt: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub(super) conversation_id: Option, +pub(super) struct SessionAudio { + pub(super) input: SessionAudioInput, + pub(super) output: SessionAudioOutput, +} + +#[derive(Debug, Clone, Serialize)] +pub(super) struct SessionAudioInput { + pub(super) format: SessionAudioFormat, +} + +#[derive(Debug, Clone, Serialize)] +pub(super) struct SessionAudioFormat { + #[serde(rename = "type")] + pub(super) kind: String, + pub(super) rate: u32, +} + +#[derive(Debug, Clone, Serialize)] +pub(super) struct SessionAudioOutput { + pub(super) voice: String, } #[derive(Debug, Clone, Serialize)] @@ -78,30 +92,25 @@ pub(super) fn parse_realtime_event(payload: &str) -> Option { } }; match message_type { - "session.created" => { - let session = parsed.get("session").and_then(Value::as_object); - let session_id = session - .and_then(|session| session.get("id")) - .and_then(Value::as_str) - .map(str::to_string) - .or_else(|| { - parsed - .get("session_id") - .and_then(Value::as_str) - .map(str::to_string) - }); - session_id.map(|id| RealtimeEvent::SessionCreated { session_id: id }) - } "session.updated" => { - let backend_prompt = parsed + let session_id = parsed .get("session") .and_then(Value::as_object) - .and_then(|session| session.get("backend_prompt")) + .and_then(|session| session.get("id")) .and_then(Value::as_str) .map(str::to_string); - Some(RealtimeEvent::SessionUpdated { backend_prompt }) + let instructions = parsed + .get("session") + .and_then(Value::as_object) + .and_then(|session| session.get("instructions")) + .and_then(Value::as_str) + .map(str::to_string); + session_id.map(|session_id| RealtimeEvent::SessionUpdated { + session_id, + instructions, + }) } - "response.output_audio.delta" => { + "conversation.output_audio.delta" => { let data = parsed .get("delta") .and_then(Value::as_str) @@ -112,7 +121,8 @@ pub(super) fn parse_realtime_event(payload: &str) -> Option { .and_then(Value::as_u64) .and_then(|v| u32::try_from(v).ok())?; let num_channels = parsed - .get("num_channels") + .get("channels") + .or_else(|| parsed.get("num_channels")) .and_then(Value::as_u64) .and_then(|v| u16::try_from(v).ok())?; Some(RealtimeEvent::AudioOut(RealtimeAudioFrame { @@ -129,10 +139,55 @@ pub(super) fn parse_realtime_event(payload: &str) -> Option { .get("item") .cloned() .map(RealtimeEvent::ConversationItemAdded), + "conversation.item.done" => parsed + .get("item") + .and_then(Value::as_object) + .and_then(|item| item.get("id")) + .and_then(Value::as_str) + .map(str::to_string) + .map(|item_id| RealtimeEvent::ConversationItemDone { item_id }), + "conversation.handoff.requested" => { + let handoff_id = parsed + .get("handoff_id") + .and_then(Value::as_str) + .map(str::to_string)?; + let item_id = parsed + .get("item_id") + .and_then(Value::as_str) + .map(str::to_string)?; + let input_transcript = parsed + .get("input_transcript") + .and_then(Value::as_str) + .map(str::to_string)?; + let messages = parsed + .get("messages") + .and_then(Value::as_array)? + .iter() + .filter_map(|message| { + let role = message.get("role").and_then(Value::as_str)?.to_string(); + let text = message.get("text").and_then(Value::as_str)?.to_string(); + Some(RealtimeHandoffMessage { role, text }) + }) + .collect(); + Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested { + handoff_id, + item_id, + input_transcript, + messages, + })) + } "error" => parsed .get("message") .and_then(Value::as_str) .map(str::to_string) + .or_else(|| { + parsed + .get("error") + .and_then(Value::as_object) + .and_then(|error| error.get("message")) + .and_then(Value::as_str) + .map(str::to_string) + }) .or_else(|| parsed.get("error").map(std::string::ToString::to_string)) .map(RealtimeEvent::Error), _ => { diff --git a/codex-rs/codex-api/tests/realtime_websocket_e2e.rs b/codex-rs/codex-api/tests/realtime_websocket_e2e.rs index ae8b082b9..aa11b7958 100644 --- a/codex-rs/codex-api/tests/realtime_websocket_e2e.rs +++ b/codex-rs/codex-api/tests/realtime_websocket_e2e.rs @@ -78,26 +78,34 @@ async fn realtime_ws_e2e_session_create_and_event_flow() { .into_text() .expect("text"); let first_json: Value = serde_json::from_str(&first).expect("json"); - assert_eq!(first_json["type"], "session.create"); + assert_eq!(first_json["type"], "session.update"); assert_eq!( - first_json["session"]["backend_prompt"], + first_json["session"]["type"], + Value::String("quicksilver".to_string()) + ); + assert_eq!( + first_json["session"]["instructions"], Value::String("backend prompt".to_string()) ); assert_eq!( - first_json["session"]["conversation_id"], - Value::String("conv_123".to_string()) + first_json["session"]["audio"]["input"]["format"]["type"], + Value::String("audio/pcm".to_string()) + ); + assert_eq!( + first_json["session"]["audio"]["input"]["format"]["rate"], + Value::from(24_000) ); ws.send(Message::Text( json!({ - "type": "session.created", - "session": {"id": "sess_mock"} + "type": "session.updated", + "session": {"id": "sess_mock", "instructions": "backend prompt"} }) .to_string() .into(), )) .await - .expect("send session.created"); + .expect("send session.updated"); let second = ws .next() @@ -107,14 +115,14 @@ async fn realtime_ws_e2e_session_create_and_event_flow() { .into_text() .expect("text"); let second_json: Value = serde_json::from_str(&second).expect("json"); - assert_eq!(second_json["type"], "response.input_audio.delta"); + assert_eq!(second_json["type"], "input_audio_buffer.append"); ws.send(Message::Text( json!({ - "type": "response.output_audio.delta", + "type": "conversation.output_audio.delta", "delta": "AQID", "sample_rate": 48000, - "num_channels": 1 + "channels": 1 }) .to_string() .into(), @@ -128,7 +136,8 @@ async fn realtime_ws_e2e_session_create_and_event_flow() { let connection = client .connect( RealtimeSessionConfig { - prompt: "backend prompt".to_string(), + instructions: "backend prompt".to_string(), + model: Some("realtime-test-model".to_string()), session_id: Some("conv_123".to_string()), }, HeaderMap::new(), @@ -144,8 +153,9 @@ async fn realtime_ws_e2e_session_create_and_event_flow() { .expect("event"); assert_eq!( created, - RealtimeEvent::SessionCreated { - session_id: "sess_mock".to_string() + RealtimeEvent::SessionUpdated { + session_id: "sess_mock".to_string(), + instructions: Some("backend prompt".to_string()), } ); @@ -189,7 +199,7 @@ async fn realtime_ws_e2e_send_while_next_event_waits() { .into_text() .expect("text"); let first_json: Value = serde_json::from_str(&first).expect("json"); - assert_eq!(first_json["type"], "session.create"); + assert_eq!(first_json["type"], "session.update"); let second = ws .next() @@ -199,18 +209,18 @@ async fn realtime_ws_e2e_send_while_next_event_waits() { .into_text() .expect("text"); let second_json: Value = serde_json::from_str(&second).expect("json"); - assert_eq!(second_json["type"], "response.input_audio.delta"); + assert_eq!(second_json["type"], "input_audio_buffer.append"); ws.send(Message::Text( json!({ - "type": "session.created", - "session": {"id": "sess_after_send"} + "type": "session.updated", + "session": {"id": "sess_after_send", "instructions": "backend prompt"} }) .to_string() .into(), )) .await - .expect("send session.created"); + .expect("send session.updated"); }) .await; @@ -218,7 +228,8 @@ async fn realtime_ws_e2e_send_while_next_event_waits() { let connection = client .connect( RealtimeSessionConfig { - prompt: "backend prompt".to_string(), + instructions: "backend prompt".to_string(), + model: Some("realtime-test-model".to_string()), session_id: Some("conv_123".to_string()), }, HeaderMap::new(), @@ -249,8 +260,9 @@ async fn realtime_ws_e2e_send_while_next_event_waits() { let next_event = next_result.expect("next event").expect("event"); assert_eq!( next_event, - RealtimeEvent::SessionCreated { - session_id: "sess_after_send".to_string() + RealtimeEvent::SessionUpdated { + session_id: "sess_after_send".to_string(), + instructions: Some("backend prompt".to_string()), } ); @@ -269,7 +281,7 @@ async fn realtime_ws_e2e_disconnected_emitted_once() { .into_text() .expect("text"); let first_json: Value = serde_json::from_str(&first).expect("json"); - assert_eq!(first_json["type"], "session.create"); + assert_eq!(first_json["type"], "session.update"); ws.send(Message::Close(None)).await.expect("send close"); }) @@ -279,7 +291,8 @@ async fn realtime_ws_e2e_disconnected_emitted_once() { let connection = client .connect( RealtimeSessionConfig { - prompt: "backend prompt".to_string(), + instructions: "backend prompt".to_string(), + model: Some("realtime-test-model".to_string()), session_id: Some("conv_123".to_string()), }, HeaderMap::new(), @@ -308,7 +321,7 @@ async fn realtime_ws_e2e_ignores_unknown_text_events() { .into_text() .expect("text"); let first_json: Value = serde_json::from_str(&first).expect("json"); - assert_eq!(first_json["type"], "session.create"); + assert_eq!(first_json["type"], "session.update"); ws.send(Message::Text( json!({ @@ -323,14 +336,14 @@ async fn realtime_ws_e2e_ignores_unknown_text_events() { ws.send(Message::Text( json!({ - "type": "session.created", - "session": {"id": "sess_after_unknown"} + "type": "session.updated", + "session": {"id": "sess_after_unknown", "instructions": "backend prompt"} }) .to_string() .into(), )) .await - .expect("send session.created"); + .expect("send session.updated"); }) .await; @@ -338,7 +351,8 @@ async fn realtime_ws_e2e_ignores_unknown_text_events() { let connection = client .connect( RealtimeSessionConfig { - prompt: "backend prompt".to_string(), + instructions: "backend prompt".to_string(), + model: Some("realtime-test-model".to_string()), session_id: Some("conv_123".to_string()), }, HeaderMap::new(), @@ -354,8 +368,9 @@ async fn realtime_ws_e2e_ignores_unknown_text_events() { .expect("event"); assert_eq!( event, - RealtimeEvent::SessionCreated { - session_id: "sess_after_unknown".to_string() + RealtimeEvent::SessionUpdated { + session_id: "sess_after_unknown".to_string(), + instructions: Some("backend prompt".to_string()), } ); diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index 689538102..0494eb64d 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -1657,11 +1657,15 @@ "$ref": "#/definitions/AbsolutePathBuf" }, "experimental_realtime_ws_backend_prompt": { - "description": "Experimental / do not use. Overrides only the realtime conversation websocket transport backend prompt (the `Op::RealtimeConversation` `/ws` session.create backend_prompt) without changing normal prompts.", + "description": "Experimental / do not use. Overrides only the realtime conversation websocket transport instructions (the `Op::RealtimeConversation` `/ws` session.update instructions) without changing normal prompts.", "type": "string" }, "experimental_realtime_ws_base_url": { - "description": "Experimental / do not use. Overrides only the realtime conversation websocket transport base URL (the `Op::RealtimeConversation` `/ws` connection) without changing normal provider HTTP requests.", + "description": "Experimental / do not use. Overrides only the realtime conversation websocket transport base URL (the `Op::RealtimeConversation` `/v1/realtime` connection) without changing normal provider HTTP requests.", + "type": "string" + }, + "experimental_realtime_ws_model": { + "description": "Experimental / do not use. Selects the realtime websocket model/snapshot used for the `Op::RealtimeConversation` connection.", "type": "string" }, "experimental_use_freeform_apply_patch": { diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 05981029c..202eace80 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2367,6 +2367,8 @@ impl Session { self.send_event_raw(event).await; self.maybe_mirror_event_text_to_realtime(&legacy_source) .await; + self.maybe_clear_realtime_handoff_for_event(&legacy_source) + .await; let show_raw_agent_reasoning = self.show_raw_agent_reasoning(); for legacy in legacy_source.as_legacy_events(show_raw_agent_reasoning) { @@ -2382,14 +2384,23 @@ impl Session { let Some(text) = realtime_text_for_event(msg) else { return; }; - if self.conversation.running_state().await.is_none() { + if self.conversation.running_state().await.is_none() + || self.conversation.active_handoff_id().await.is_none() + { return; } - if let Err(err) = self.conversation.text_in(text).await { + if let Err(err) = self.conversation.handoff_out(text).await { debug!("failed to mirror event text to realtime conversation: {err}"); } } + async fn maybe_clear_realtime_handoff_for_event(&self, msg: &EventMsg) { + if !matches!(msg, EventMsg::TurnComplete(_)) { + return; + } + self.conversation.clear_active_handoff().await; + } + pub(crate) async fn send_event_raw(&self, event: Event) { // Record the last known agent status. if let Some(status) = agent_status_from_event(&event.msg) { diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index b5520a2f0..e9ee6fd92 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -435,12 +435,16 @@ pub struct Config { pub realtime_audio: RealtimeAudioConfig, /// Experimental / do not use. Overrides only the realtime conversation - /// websocket transport base URL (the `Op::RealtimeConversation` `/ws` + /// websocket transport base URL (the `Op::RealtimeConversation` + /// `/v1/realtime` /// connection) without changing normal provider HTTP requests. pub experimental_realtime_ws_base_url: Option, + /// Experimental / do not use. Selects the realtime websocket model/snapshot + /// used for the `Op::RealtimeConversation` connection. + pub experimental_realtime_ws_model: Option, /// Experimental / do not use. Overrides only the realtime conversation - /// websocket transport backend prompt (the `Op::RealtimeConversation` - /// `/ws` session.create backend_prompt) without changing normal prompts. + /// websocket transport instructions (the `Op::RealtimeConversation` + /// `/ws` session.update instructions) without changing normal prompts. pub experimental_realtime_ws_backend_prompt: Option, /// When set, restricts ChatGPT login to a specific workspace identifier. pub forced_chatgpt_workspace_id: Option, @@ -1188,12 +1192,16 @@ pub struct ConfigToml { pub audio: Option, /// Experimental / do not use. Overrides only the realtime conversation - /// websocket transport base URL (the `Op::RealtimeConversation` `/ws` + /// websocket transport base URL (the `Op::RealtimeConversation` + /// `/v1/realtime` /// connection) without changing normal provider HTTP requests. pub experimental_realtime_ws_base_url: Option, + /// Experimental / do not use. Selects the realtime websocket model/snapshot + /// used for the `Op::RealtimeConversation` connection. + pub experimental_realtime_ws_model: Option, /// Experimental / do not use. Overrides only the realtime conversation - /// websocket transport backend prompt (the `Op::RealtimeConversation` - /// `/ws` session.create backend_prompt) without changing normal prompts. + /// websocket transport instructions (the `Op::RealtimeConversation` + /// `/ws` session.update instructions) without changing normal prompts. pub experimental_realtime_ws_backend_prompt: Option, pub projects: Option>, @@ -2182,6 +2190,7 @@ impl Config { speaker: audio.speaker, }), experimental_realtime_ws_base_url: cfg.experimental_realtime_ws_base_url, + experimental_realtime_ws_model: cfg.experimental_realtime_ws_model, experimental_realtime_ws_backend_prompt: cfg.experimental_realtime_ws_backend_prompt, forced_chatgpt_workspace_id, forced_login_method, @@ -4924,6 +4933,7 @@ model_verbosity = "high" chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), realtime_audio: RealtimeAudioConfig::default(), experimental_realtime_ws_base_url: None, + experimental_realtime_ws_model: None, experimental_realtime_ws_backend_prompt: None, base_instructions: None, developer_instructions: None, @@ -5052,6 +5062,7 @@ model_verbosity = "high" chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), realtime_audio: RealtimeAudioConfig::default(), experimental_realtime_ws_base_url: None, + experimental_realtime_ws_model: None, experimental_realtime_ws_backend_prompt: None, base_instructions: None, developer_instructions: None, @@ -5178,6 +5189,7 @@ model_verbosity = "high" chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), realtime_audio: RealtimeAudioConfig::default(), experimental_realtime_ws_base_url: None, + experimental_realtime_ws_model: None, experimental_realtime_ws_backend_prompt: None, base_instructions: None, developer_instructions: None, @@ -5290,6 +5302,7 @@ model_verbosity = "high" chatgpt_base_url: "https://chatgpt.com/backend-api/".to_string(), realtime_audio: RealtimeAudioConfig::default(), experimental_realtime_ws_base_url: None, + experimental_realtime_ws_model: None, experimental_realtime_ws_backend_prompt: None, base_instructions: None, developer_instructions: None, @@ -6135,6 +6148,34 @@ experimental_realtime_ws_backend_prompt = "prompt from config" Ok(()) } + #[test] + fn experimental_realtime_ws_model_loads_from_config_toml() -> std::io::Result<()> { + let cfg: ConfigToml = toml::from_str( + r#" +experimental_realtime_ws_model = "realtime-test-model" +"#, + ) + .expect("TOML deserialization should succeed"); + + assert_eq!( + cfg.experimental_realtime_ws_model.as_deref(), + Some("realtime-test-model") + ); + + let codex_home = TempDir::new()?; + let config = Config::load_from_base_config_with_overrides( + cfg, + ConfigOverrides::default(), + codex_home.path().to_path_buf(), + )?; + + assert_eq!( + config.experimental_realtime_ws_model.as_deref(), + Some("realtime-test-model") + ); + Ok(()) + } + #[test] fn realtime_audio_loads_from_config_toml() -> std::io::Result<()> { let cfg: ConfigToml = toml::from_str( diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 861dd3164..8842c41c9 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -1,5 +1,6 @@ use crate::CodexAuth; use crate::api_bridge::map_api_error; +use crate::auth::read_openai_api_key_from_env; use crate::codex::Session; use crate::default_client::default_headers; use crate::error::CodexErr; @@ -24,8 +25,10 @@ use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RealtimeConversationClosedEvent; use codex_protocol::protocol::RealtimeConversationRealtimeEvent; use codex_protocol::protocol::RealtimeConversationStartedEvent; +use codex_protocol::protocol::RealtimeHandoffRequested; use http::HeaderMap; -use serde_json::Value; +use http::HeaderValue; +use http::header::AUTHORIZATION; use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -37,17 +40,55 @@ use tracing::info; use tracing::warn; const AUDIO_IN_QUEUE_CAPACITY: usize = 256; -const TEXT_IN_QUEUE_CAPACITY: usize = 64; +const USER_TEXT_IN_QUEUE_CAPACITY: usize = 64; +const HANDOFF_OUT_QUEUE_CAPACITY: usize = 64; const OUTPUT_EVENTS_QUEUE_CAPACITY: usize = 256; pub(crate) struct RealtimeConversationManager { state: Mutex>, } +#[derive(Clone, Debug)] +struct RealtimeHandoffState { + output_tx: Sender, + active_handoff: Arc>>, +} + +#[derive(Debug, PartialEq, Eq)] +struct HandoffOutput { + handoff_id: String, + output_text: String, +} + +impl RealtimeHandoffState { + fn new(output_tx: Sender) -> Self { + Self { + output_tx, + active_handoff: Arc::new(Mutex::new(None)), + } + } + + async fn send_output(&self, output_text: String) -> CodexResult<()> { + let Some(handoff_id) = self.active_handoff.lock().await.clone() else { + return Ok(()); + }; + + self.output_tx + .send(HandoffOutput { + handoff_id, + output_text, + }) + .await + .map_err(|_| CodexErr::InvalidRequest("conversation is not running".to_string()))?; + Ok(()) + } +} + #[allow(dead_code)] struct ConversationState { audio_tx: Sender, - text_tx: Sender, + user_text_tx: Sender, + handoff: RealtimeHandoffState, task: JoinHandle<()>, realtime_active: Arc, } @@ -72,6 +113,7 @@ impl RealtimeConversationManager { api_provider: ApiProvider, extra_headers: Option, prompt: String, + model: Option, session_id: Option, ) -> CodexResult<(Receiver, Arc)> { let previous_state = { @@ -84,7 +126,11 @@ impl RealtimeConversationManager { let _ = state.task.await; } - let session_config = RealtimeSessionConfig { prompt, session_id }; + let session_config = RealtimeSessionConfig { + instructions: prompt, + model, + session_id, + }; let client = RealtimeWebsocketClient::new(api_provider); let connection = client .connect( @@ -99,17 +145,30 @@ impl RealtimeConversationManager { let events = connection.events(); let (audio_tx, audio_rx) = async_channel::bounded::(AUDIO_IN_QUEUE_CAPACITY); - let (text_tx, text_rx) = async_channel::bounded::(TEXT_IN_QUEUE_CAPACITY); + let (user_text_tx, user_text_rx) = + async_channel::bounded::(USER_TEXT_IN_QUEUE_CAPACITY); + let (handoff_output_tx, handoff_output_rx) = + async_channel::bounded::(HANDOFF_OUT_QUEUE_CAPACITY); let (events_tx, events_rx) = async_channel::bounded::(OUTPUT_EVENTS_QUEUE_CAPACITY); let realtime_active = Arc::new(AtomicBool::new(true)); - let task = spawn_realtime_input_task(writer, events, text_rx, audio_rx, events_tx); + let handoff = RealtimeHandoffState::new(handoff_output_tx); + let task = spawn_realtime_input_task( + writer, + events, + user_text_rx, + handoff_output_rx, + audio_rx, + events_tx, + handoff.clone(), + ); let mut guard = self.state.lock().await; *guard = Some(ConversationState { audio_tx, - text_tx, + user_text_tx, + handoff, task, realtime_active: Arc::clone(&realtime_active), }); @@ -143,7 +202,7 @@ impl RealtimeConversationManager { pub(crate) async fn text_in(&self, text: String) -> CodexResult<()> { let sender = { let guard = self.state.lock().await; - guard.as_ref().map(|state| state.text_tx.clone()) + guard.as_ref().map(|state| state.user_text_tx.clone()) }; let Some(sender) = sender else { @@ -159,6 +218,38 @@ impl RealtimeConversationManager { Ok(()) } + pub(crate) async fn handoff_out(&self, output_text: String) -> CodexResult<()> { + let handoff = { + let guard = self.state.lock().await; + let Some(state) = guard.as_ref() else { + return Err(CodexErr::InvalidRequest( + "conversation is not running".to_string(), + )); + }; + state.handoff.clone() + }; + + handoff.send_output(output_text).await + } + + pub(crate) async fn active_handoff_id(&self) -> Option { + let handoff = { + let guard = self.state.lock().await; + guard.as_ref().map(|state| state.handoff.clone()) + }?; + handoff.active_handoff.lock().await.clone() + } + + pub(crate) async fn clear_active_handoff(&self) { + let handoff = { + let guard = self.state.lock().await; + guard.as_ref().map(|state| state.handoff.clone()) + }; + if let Some(handoff) = handoff { + *handoff.active_handoff.lock().await = None; + } + } + pub(crate) async fn shutdown(&self) -> CodexResult<()> { let state = { let mut guard = self.state.lock().await; @@ -181,7 +272,8 @@ pub(crate) async fn handle_start( ) -> CodexResult<()> { let provider = sess.provider().await; let auth = sess.services.auth_manager.auth().await; - let mut api_provider = provider.to_api_provider(auth.as_ref().map(CodexAuth::auth_mode))?; + let realtime_api_key = realtime_api_key(auth.as_ref(), &provider)?; + let mut api_provider = provider.to_api_provider(Some(crate::auth::AuthMode::ApiKey))?; let config = sess.get_config().await; if let Some(realtime_ws_base_url) = &config.experimental_realtime_ws_base_url { api_provider.base_url = realtime_ws_base_url.clone(); @@ -190,14 +282,23 @@ pub(crate) async fn handle_start( .experimental_realtime_ws_backend_prompt .clone() .unwrap_or(params.prompt); + let model = config.experimental_realtime_ws_model.clone(); let requested_session_id = params .session_id .or_else(|| Some(sess.conversation_id.to_string())); + let extra_headers = + realtime_request_headers(requested_session_id.as_deref(), realtime_api_key.as_str())?; info!("starting realtime conversation"); let (events_rx, realtime_active) = match sess .conversation - .start(api_provider, None, prompt, requested_session_id.clone()) + .start( + api_provider, + extra_headers, + prompt, + model, + requested_session_id.clone(), + ) .await { Ok(events_rx) => events_rx, @@ -227,8 +328,8 @@ pub(crate) async fn handle_start( while let Ok(event) = events_rx.recv().await { debug!(conversation_id = %sess_clone.conversation_id, "received realtime conversation event"); let maybe_routed_text = match &event { - RealtimeEvent::ConversationItemAdded(item) => { - realtime_text_from_conversation_item(item) + RealtimeEvent::HandoffRequested(handoff) => { + realtime_text_from_handoff_request(handoff) } _ => None, }; @@ -271,26 +372,57 @@ pub(crate) async fn handle_audio( } } -fn realtime_text_from_conversation_item(item: &Value) -> Option { - match item.get("type").and_then(Value::as_str) { - Some("message") => { - if item.get("role").and_then(Value::as_str) != Some("assistant") { - return None; - } - let content = item.get("content")?.as_array()?; - let text = content - .iter() - .filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text")) - .filter_map(|entry| entry.get("text").and_then(Value::as_str)) - .collect::(); - if text.is_empty() { None } else { Some(text) } - } - Some("spawn_transcript") => item - .get("delta_user_transcript") - .and_then(Value::as_str) - .and_then(|text| (!text.is_empty()).then(|| text.to_string())), - Some(_) | None => None, +fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Option { + (!handoff.input_transcript.is_empty()).then(|| handoff.input_transcript.clone()) +} + +fn realtime_api_key( + auth: Option<&CodexAuth>, + provider: &crate::ModelProviderInfo, +) -> CodexResult { + if let Some(api_key) = provider.api_key()? { + return Ok(api_key); } + + if let Some(token) = provider.experimental_bearer_token.clone() { + return Ok(token); + } + + if let Some(api_key) = auth.and_then(CodexAuth::api_key) { + return Ok(api_key.to_string()); + } + + // TODO(aibrahim): Remove this temporary fallback once realtime auth no longer + // requires API key auth for ChatGPT/SIWC sessions. + if provider.is_openai() + && let Some(api_key) = read_openai_api_key_from_env() + { + return Ok(api_key); + } + + Err(CodexErr::InvalidRequest( + "realtime conversation requires API key auth".to_string(), + )) +} + +fn realtime_request_headers( + session_id: Option<&str>, + api_key: &str, +) -> CodexResult> { + let mut headers = HeaderMap::new(); + + if let Some(session_id) = session_id + && let Ok(session_id) = HeaderValue::from_str(session_id) + { + headers.insert("x-session-id", session_id); + } + + let auth_value = HeaderValue::from_str(&format!("Bearer {api_key}")).map_err(|err| { + CodexErr::InvalidRequest(format!("invalid realtime api key header: {err}")) + })?; + headers.insert(AUTHORIZATION, auth_value); + + Ok(Some(headers)) } pub(crate) async fn handle_text( @@ -326,14 +458,16 @@ pub(crate) async fn handle_close(sess: &Arc, sub_id: String) { fn spawn_realtime_input_task( writer: RealtimeWebsocketWriter, events: RealtimeWebsocketEvents, - text_rx: Receiver, + user_text_rx: Receiver, + handoff_output_rx: Receiver, audio_rx: Receiver, events_tx: Sender, + handoff_state: RealtimeHandoffState, ) -> JoinHandle<()> { tokio::spawn(async move { loop { tokio::select! { - text = text_rx.recv() => { + text = user_text_rx.recv() => { match text { Ok(text) => { if let Err(err) = writer.send_conversation_item_create(text).await { @@ -345,9 +479,31 @@ fn spawn_realtime_input_task( Err(_) => break, } } + handoff_output = handoff_output_rx.recv() => { + match handoff_output { + Ok(HandoffOutput { + handoff_id, + output_text, + }) => { + if let Err(err) = writer + .send_conversation_handoff_append(handoff_id, output_text) + .await + { + let mapped_error = map_api_error(err); + warn!("failed to send handoff output: {mapped_error}"); + break; + } + } + Err(_) => break, + } + } event = events.next_event() => { match event { Ok(Some(event)) => { + if let RealtimeEvent::HandoffRequested(handoff) = &event { + *handoff_state.active_handoff.lock().await = + Some(handoff.handoff_id.clone()); + } let should_stop = matches!(&event, RealtimeEvent::Error(_)); if events_tx.send(event).await.is_err() { break; @@ -414,82 +570,98 @@ async fn send_conversation_error( #[cfg(test)] mod tests { - use super::realtime_text_from_conversation_item; + use super::HandoffOutput; + use super::RealtimeHandoffState; + use super::realtime_text_from_handoff_request; + use async_channel::bounded; + use codex_protocol::protocol::RealtimeHandoffMessage; + use codex_protocol::protocol::RealtimeHandoffRequested; use pretty_assertions::assert_eq; - use serde_json::json; #[test] - fn extracts_text_from_assistant_message_items_only() { - let assistant = json!({ - "type": "message", - "role": "assistant", - "content": [{"type": "text", "text": "hello"}], - }); + fn extracts_text_from_handoff_request_input_transcript() { + let handoff = RealtimeHandoffRequested { + handoff_id: "handoff_1".to_string(), + item_id: "item_1".to_string(), + input_transcript: "hello".to_string(), + messages: vec![RealtimeHandoffMessage { + role: "user".to_string(), + text: "hello".to_string(), + }], + }; assert_eq!( - realtime_text_from_conversation_item(&assistant), + realtime_text_from_handoff_request(&handoff), Some("hello".to_string()) ); - - let user = json!({ - "type": "message", - "role": "user", - "content": [{"type": "text", "text": "world"}], - }); - assert_eq!(realtime_text_from_conversation_item(&user), None); } #[test] - fn extracts_and_concatenates_text_entries_only() { - let item = json!({ - "type": "message", - "role": "assistant", - "content": [ - {"type": "text", "text": "a"}, - {"type": "ignored", "text": "x"}, - {"type": "text", "text": "b"} - ], - }); - assert_eq!( - realtime_text_from_conversation_item(&item), - Some("ab".to_string()) - ); + fn ignores_empty_handoff_request_input_transcript() { + let handoff = RealtimeHandoffRequested { + handoff_id: "handoff_1".to_string(), + item_id: "item_1".to_string(), + input_transcript: String::new(), + messages: vec![], + }; + assert_eq!(realtime_text_from_handoff_request(&handoff), None); } - #[test] - fn ignores_non_message_or_missing_text() { - let non_message = json!({ - "type": "tool_call", - "content": [{"type": "text", "text": "nope"}], - }); - assert_eq!(realtime_text_from_conversation_item(&non_message), None); + #[tokio::test] + async fn clears_active_handoff_explicitly() { + let (tx, _rx) = bounded(1); + let state = RealtimeHandoffState::new(tx); - let no_text = json!({ - "type": "message", - "role": "assistant", - "content": [{"type": "other", "value": 1}], - }); - assert_eq!(realtime_text_from_conversation_item(&no_text), None); - - let empty_spawn_transcript = json!({ - "type": "spawn_transcript", - "delta_user_transcript": "", - }); + *state.active_handoff.lock().await = Some("handoff_1".to_string()); assert_eq!( - realtime_text_from_conversation_item(&empty_spawn_transcript), - None + state.active_handoff.lock().await.clone(), + Some("handoff_1".to_string()) ); + + *state.active_handoff.lock().await = None; + assert_eq!(state.active_handoff.lock().await.clone(), None); } - #[test] - fn extracts_text_from_spawn_transcript_items() { - let item = json!({ - "type": "spawn_transcript", - "delta_user_transcript": "delegate from transcript", - "backend_prompt_messages": [{"role": "user", "content": "delegate from transcript"}], - }); + #[tokio::test] + async fn sends_multiple_handoff_outputs_until_cleared() { + let (tx, rx) = bounded(4); + let state = RealtimeHandoffState::new(tx); + + state + .send_output("ignored".to_string()) + .await + .expect("send"); + assert!(rx.is_empty()); + + *state.active_handoff.lock().await = Some("handoff_1".to_string()); + state.send_output("result".to_string()).await.expect("send"); + state + .send_output("result 2".to_string()) + .await + .expect("send"); + + let output_1 = rx.recv().await.expect("recv"); assert_eq!( - realtime_text_from_conversation_item(&item), - Some("delegate from transcript".to_string()) + output_1, + HandoffOutput { + handoff_id: "handoff_1".to_string(), + output_text: "result".to_string(), + } ); + + let output_2 = rx.recv().await.expect("recv"); + assert_eq!( + output_2, + HandoffOutput { + handoff_id: "handoff_1".to_string(), + output_text: "result 2".to_string(), + } + ); + + *state.active_handoff.lock().await = None; + state + .send_output("ignored after clear".to_string()) + .await + .expect("send"); + assert!(rx.is_empty()); } } diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index bb6de200a..c1a40e90f 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -390,10 +390,15 @@ impl WebSocketRequest { #[derive(Debug, Clone)] pub struct WebSocketHandshake { + uri: String, headers: Vec<(String, String)>, } impl WebSocketHandshake { + pub fn uri(&self) -> &str { + &self.uri + } + pub fn header(&self, name: &str) -> Option { self.headers .iter() @@ -1223,10 +1228,10 @@ pub async fn start_websocket_server_with_headers( .map(|value| (name.as_str().to_string(), value.to_string())) }) .collect(); - handshake_log - .lock() - .unwrap() - .push(WebSocketHandshake { headers }); + handshake_log.lock().unwrap().push(WebSocketHandshake { + uri: req.uri().to_string(), + headers, + }); let headers_mut = response.headers_mut(); for (name, value) in &response_headers { diff --git a/codex-rs/core/tests/common/test_codex.rs b/codex-rs/core/tests/common/test_codex.rs index bd15c6d7e..362bbbf62 100644 --- a/codex-rs/core/tests/common/test_codex.rs +++ b/codex-rs/core/tests/common/test_codex.rs @@ -126,6 +126,7 @@ impl TestCodexBuilder { let base_url_clone = base_url.clone(); self.config_mutators.push(Box::new(move |config| { config.model_provider.base_url = Some(base_url_clone); + config.experimental_realtime_ws_model = Some("realtime-test-model".to_string()); config.features.enable(Feature::ResponsesWebsockets); })); self.build_with_home_and_base_url(base_url, home, None) diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 223298878..6b5b21eb3 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -86,7 +86,7 @@ fn remote_realtime_test_codex_builder( ) -> TestCodexBuilder { let realtime_base_url = realtime_server.uri().to_string(); test_codex() - .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_auth(CodexAuth::from_api_key("dummy")) .with_config(move |config| { config.experimental_realtime_ws_base_url = Some(realtime_base_url); }) @@ -95,8 +95,8 @@ fn remote_realtime_test_codex_builder( async fn start_remote_realtime_server() -> responses::WebSocketTestServer { start_websocket_server(vec![vec![ vec![json!({ - "type": "session.created", - "session": { "id": "sess_remote_compact" } + "type": "session.updated", + "session": { "id": "sess_remote_compact", "instructions": "backend prompt" } })], // Keep the websocket open after startup so routed transcript items during the test do not // exhaust the scripted responses and mark realtime inactive before the assertions run. @@ -130,7 +130,7 @@ async fn start_realtime_conversation(codex: &codex_core::CodexThread) -> Result< wait_for_event_match(codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, + payload: RealtimeEvent::SessionUpdated { session_id, .. }, }) => Some(session_id.clone()), _ => None, }) diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index b6191bc88..32500299c 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -1,4 +1,6 @@ use anyhow::Result; +use codex_core::CodexAuth; +use codex_core::auth::OPENAI_API_KEY_ENV_VAR; use codex_protocol::protocol::CodexErrorInfo; use codex_protocol::protocol::ConversationAudioParams; use codex_protocol::protocol::ConversationStartParams; @@ -22,6 +24,7 @@ use core_test_support::wait_for_event_match; use pretty_assertions::assert_eq; use serde_json::Value; use serde_json::json; +use std::ffi::OsString; use std::time::Duration; use tokio::sync::oneshot; @@ -33,16 +36,16 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> { vec![], vec![ vec![json!({ - "type": "session.created", - "session": { "id": "sess_1" } + "type": "session.updated", + "session": { "id": "sess_1", "instructions": "backend prompt" } })], vec![], vec![ json!({ - "type": "response.output_audio.delta", + "type": "conversation.output_audio.delta", "delta": "AQID", "sample_rate": 24000, - "num_channels": 1 + "channels": 1 }), json!({ "type": "conversation.item.added", @@ -77,14 +80,14 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> { .unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}")); assert!(started.session_id.is_some()); - let session_created = wait_for_event_match(&test.codex, |msg| match msg { + let session_updated = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, + payload: RealtimeEvent::SessionUpdated { session_id, .. }, }) => Some(session_id.clone()), _ => None, }) .await; - assert_eq!(session_created, "sess_1"); + assert_eq!(session_updated, "sess_1"); test.codex .submit(Op::RealtimeConversationAudio(ConversationAudioParams { @@ -117,17 +120,29 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> { assert_eq!(connection.len(), 3); assert_eq!( connection[0].body_json()["type"].as_str(), - Some("session.create") + Some("session.update") ); assert_eq!( - connection[0].body_json()["session"]["conversation_id"] - .as_str() - .expect("session.create conversation_id"), + connection[0].body_json()["session"]["instructions"].as_str(), + Some("backend prompt") + ); + assert_eq!( + server.handshakes()[1] + .header("x-session-id") + .expect("session.update x-session-id header"), started .session_id .as_deref() .expect("started session id should be present") ); + assert_eq!( + server.handshakes()[1].header("authorization").as_deref(), + Some("Bearer dummy") + ); + assert_eq!( + server.handshakes()[1].uri(), + "/v1/realtime?intent=quicksilver&model=realtime-test-model" + ); let mut request_types = [ connection[1].body_json()["type"] .as_str() @@ -143,7 +158,7 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> { request_types, [ "conversation.item.create".to_string(), - "response.input_audio.delta".to_string(), + "input_audio_buffer.append".to_string(), ] ); @@ -162,15 +177,74 @@ async fn conversation_start_audio_text_close_round_trip() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn conversation_start_uses_openai_env_key_fallback_with_chatgpt_auth() -> Result<()> { + skip_if_no_network!(Ok(())); + + let _env_guard = EnvGuard::set(OPENAI_API_KEY_ENV_VAR, "env-realtime-key"); + let server = start_websocket_server(vec![ + vec![], + vec![vec![json!({ + "type": "session.updated", + "session": { "id": "sess_env", "instructions": "backend prompt" } + })]], + ]) + .await; + + let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()); + let test = builder.build_with_websocket_server(&server).await?; + assert!(server.wait_for_handshakes(1, Duration::from_secs(2)).await); + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + let started = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationStarted(started) => Some(Ok(started.clone())), + EventMsg::Error(err) => Some(Err(err.clone())), + _ => None, + }) + .await + .unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}")); + assert!(started.session_id.is_some()); + + let session_updated = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionUpdated { session_id, .. }, + }) => Some(session_id.clone()), + _ => None, + }) + .await; + assert_eq!(session_updated, "sess_env"); + + assert_eq!( + server.handshakes()[1].header("authorization").as_deref(), + Some("Bearer env-realtime-key") + ); + + test.codex.submit(Op::RealtimeConversationClose).await?; + let _closed = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()), + _ => None, + }) + .await; + + server.shutdown().await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn conversation_transport_close_emits_closed_event() -> Result<()> { skip_if_no_network!(Ok(())); - let session_created = vec![json!({ - "type": "session.created", - "session": { "id": "sess_1" } + let session_updated = vec![json!({ + "type": "session.updated", + "session": { "id": "sess_1", "instructions": "backend prompt" } })]; - let server = start_websocket_server(vec![vec![], vec![session_created]]).await; + let server = start_websocket_server(vec![vec![], vec![session_updated]]).await; let mut builder = test_codex(); let test = builder.build_with_websocket_server(&server).await?; @@ -192,14 +266,14 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> { .unwrap_or_else(|err: ErrorEvent| panic!("conversation start failed: {err:?}")); assert!(started.session_id.is_some()); - let session_created = wait_for_event_match(&test.codex, |msg| match msg { + let session_updated = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, + payload: RealtimeEvent::SessionUpdated { session_id, .. }, }) => Some(session_id.clone()), _ => None, }) .await; - assert_eq!(session_created, "sess_1"); + assert_eq!(session_updated, "sess_1"); let closed = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationClosed(closed) => Some(closed.clone()), @@ -212,6 +286,34 @@ async fn conversation_transport_close_emits_closed_event() -> Result<()> { Ok(()) } +struct EnvGuard { + key: &'static str, + original: Option, +} + +impl EnvGuard { + fn set(key: &'static str, value: &str) -> Self { + let original = std::env::var_os(key); + // SAFETY: this guard restores the original value before the test exits. + unsafe { + std::env::set_var(key, value); + } + Self { key, original } + } +} + +impl Drop for EnvGuard { + fn drop(&mut self) { + // SAFETY: this guard restores the original value for the modified env var. + unsafe { + match &self.original { + Some(value) => std::env::set_var(self.key, value), + None => std::env::remove_var(self.key), + } + } + } +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn conversation_audio_before_start_emits_error() -> Result<()> { skip_if_no_network!(Ok(())); @@ -276,19 +378,19 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> { let server = start_websocket_server(vec![ vec![], vec![vec![json!({ - "type": "session.created", - "session": { "id": "sess_old" } + "type": "session.updated", + "session": { "id": "sess_old", "instructions": "old" } })]], vec![ vec![json!({ - "type": "session.created", - "session": { "id": "sess_new" } + "type": "session.updated", + "session": { "id": "sess_new", "instructions": "new" } })], vec![json!({ - "type": "response.output_audio.delta", + "type": "conversation.output_audio.delta", "delta": "AQID", "sample_rate": 24000, - "num_channels": 1 + "channels": 1 })], ], ]) @@ -305,7 +407,7 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> { .await?; wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, + payload: RealtimeEvent::SessionUpdated { session_id, .. }, }) if session_id == "sess_old" => Some(Ok(())), EventMsg::Error(err) => Some(Err(err.clone())), _ => None, @@ -321,7 +423,7 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> { .await?; wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, + payload: RealtimeEvent::SessionUpdated { session_id, .. }, }) if session_id == "sess_new" => Some(Ok(())), EventMsg::Error(err) => Some(Err(err.clone())), _ => None, @@ -351,17 +453,25 @@ async fn conversation_second_start_replaces_runtime() -> Result<()> { assert_eq!(connections.len(), 3); assert_eq!(connections[1].len(), 1); assert_eq!( - connections[1][0].body_json()["session"]["conversation_id"].as_str(), + connections[1][0].body_json()["session"]["instructions"].as_str(), + Some("old") + ); + assert_eq!( + server.handshakes()[1].header("x-session-id").as_deref(), Some("conv_old") ); assert_eq!(connections[2].len(), 2); assert_eq!( - connections[2][0].body_json()["session"]["conversation_id"].as_str(), + connections[2][0].body_json()["session"]["instructions"].as_str(), + Some("new") + ); + assert_eq!( + server.handshakes()[2].header("x-session-id").as_deref(), Some("conv_new") ); assert_eq!( connections[2][1].body_json()["type"].as_str(), - Some("response.input_audio.delta") + Some("input_audio_buffer.append") ); server.shutdown().await; @@ -374,8 +484,8 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul let startup_server = start_websocket_server(vec![vec![]]).await; let realtime_server = start_websocket_server(vec![vec![vec![json!({ - "type": "session.created", - "session": { "id": "sess_override" } + "type": "session.updated", + "session": { "id": "sess_override", "instructions": "backend prompt" } })]]]) .await; @@ -399,14 +509,14 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul })) .await?; - let session_created = wait_for_event_match(&test.codex, |msg| match msg { + let session_updated = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, + payload: RealtimeEvent::SessionUpdated { session_id, .. }, }) => Some(session_id.clone()), _ => None, }) .await; - assert_eq!(session_created, "sess_override"); + assert_eq!(session_updated, "sess_override"); let startup_connections = startup_server.connections(); assert_eq!(startup_connections.len(), 1); @@ -415,7 +525,7 @@ async fn conversation_uses_experimental_realtime_ws_base_url_override() -> Resul assert_eq!(realtime_connections.len(), 1); assert_eq!( realtime_connections[0][0].body_json()["type"].as_str(), - Some("session.create") + Some("session.update") ); startup_server.shutdown().await; @@ -430,8 +540,8 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() -> let server = start_websocket_server(vec![ vec![], vec![vec![json!({ - "type": "session.created", - "session": { "id": "sess_override" } + "type": "session.updated", + "session": { "id": "sess_override", "instructions": "prompt from config" } })]], ]) .await; @@ -449,19 +559,19 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() -> })) .await?; - let session_created = wait_for_event_match(&test.codex, |msg| match msg { + let session_updated = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, + payload: RealtimeEvent::SessionUpdated { session_id, .. }, }) => Some(session_id.clone()), _ => None, }) .await; - assert_eq!(session_created, "sess_override"); + assert_eq!(session_updated, "sess_override"); let connections = server.connections(); assert_eq!(connections.len(), 2); assert_eq!( - connections[1][0].body_json()["session"]["backend_prompt"].as_str(), + connections[1][0].body_json()["session"]["instructions"].as_str(), Some("prompt from config") ); @@ -470,7 +580,7 @@ async fn conversation_uses_experimental_realtime_ws_backend_prompt_override() -> } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn conversation_mirrors_assistant_message_text_to_realtime_websocket() -> Result<()> { +async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Result<()> { skip_if_no_network!(Ok(())); let api_server = start_mock_server().await; @@ -485,10 +595,19 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_websocket() -> .await; let realtime_server = start_websocket_server(vec![vec![ - vec![json!({ - "type": "session.created", - "session": { "id": "sess_1" } - })], + vec![ + json!({ + "type": "session.updated", + "session": { "id": "sess_1", "instructions": "backend prompt" } + }), + json!({ + "type": "conversation.handoff.requested", + "handoff_id": "handoff_1", + "item_id": "item_1", + "input_transcript": "delegate hello", + "messages": [{ "role": "user", "text": "delegate hello" }] + }), + ], vec![], ]]) .await; @@ -508,16 +627,27 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_websocket() -> })) .await?; - let session_created = wait_for_event_match(&test.codex, |msg| match msg { + let session_updated = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, + payload: RealtimeEvent::SessionUpdated { session_id, .. }, }) => Some(session_id.clone()), _ => None, }) .await; - assert_eq!(session_created, "sess_1"); + assert_eq!(session_updated, "sess_1"); - test.submit_turn("hello").await?; + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::HandoffRequested(handoff), + }) if handoff.handoff_id == "handoff_1" => Some(()), + _ => None, + }) + .await; + + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; let deadline = tokio::time::Instant::now() + Duration::from_secs(2); while tokio::time::Instant::now() < deadline { @@ -533,14 +663,18 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_websocket() -> assert_eq!(realtime_connections[0].len(), 2); assert_eq!( realtime_connections[0][0].body_json()["type"].as_str(), - Some("session.create") + Some("session.update") ); assert_eq!( realtime_connections[0][1].body_json()["type"].as_str(), - Some("conversation.item.create") + Some("conversation.handoff.append") ); assert_eq!( - realtime_connections[0][1].body_json()["item"]["content"][0]["text"].as_str(), + realtime_connections[0][1].body_json()["handoff_id"].as_str(), + Some("handoff_1") + ); + assert_eq!( + realtime_connections[0][1].body_json()["output_text"].as_str(), Some("assistant says hi") ); @@ -548,6 +682,145 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_websocket() -> Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn conversation_handoff_persists_across_item_done_until_turn_complete() -> Result<()> { + skip_if_no_network!(Ok(())); + + let (gate_second_message_tx, gate_second_message_rx) = oneshot::channel(); + let first_chunks = vec![ + StreamingSseChunk { + gate: None, + body: sse_event(responses::ev_response_created("resp-1")), + }, + StreamingSseChunk { + gate: None, + body: sse_event(responses::ev_assistant_message( + "msg-1", + "assistant message 1", + )), + }, + StreamingSseChunk { + gate: Some(gate_second_message_rx), + body: sse_event(responses::ev_assistant_message( + "msg-2", + "assistant message 2", + )), + }, + StreamingSseChunk { + gate: None, + body: sse_event(responses::ev_completed("resp-1")), + }, + ]; + let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await; + + let realtime_server = start_websocket_server(vec![vec![ + vec![ + json!({ + "type": "session.updated", + "session": { "id": "sess_item_done", "instructions": "backend prompt" } + }), + json!({ + "type": "conversation.handoff.requested", + "handoff_id": "handoff_item_done", + "item_id": "item_item_done", + "input_transcript": "delegate now", + "messages": [{ "role": "user", "text": "delegate now" }] + }), + ], + vec![json!({ + "type": "conversation.item.done", + "item": { "id": "item_item_done" } + })], + vec![], + ]]) + .await; + + let mut builder = test_codex().with_config({ + let realtime_base_url = realtime_server.uri().to_string(); + move |config| { + config.experimental_realtime_ws_base_url = Some(realtime_base_url); + } + }); + let test = builder.build_with_streaming_server(&api_server).await?; + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionUpdated { session_id, .. }, + }) if session_id == "sess_item_done" => Some(()), + _ => None, + }) + .await; + + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::HandoffRequested(handoff), + }) if handoff.handoff_id == "handoff_item_done" => Some(()), + _ => None, + }) + .await; + + let first_append = realtime_server.wait_for_request(0, 1).await; + assert_eq!( + first_append.body_json()["type"].as_str(), + Some("conversation.handoff.append") + ); + assert_eq!( + first_append.body_json()["handoff_id"].as_str(), + Some("handoff_item_done") + ); + assert_eq!( + first_append.body_json()["output_text"].as_str(), + Some("assistant message 1") + ); + + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::ConversationItemDone { item_id }, + }) if item_id == "item_item_done" => Some(()), + _ => None, + }) + .await; + + let _ = gate_second_message_tx.send(()); + + let second_append = realtime_server.wait_for_request(0, 2).await; + assert_eq!( + second_append.body_json()["type"].as_str(), + Some("conversation.handoff.append") + ); + assert_eq!( + second_append.body_json()["handoff_id"].as_str(), + Some("handoff_item_done") + ); + assert_eq!( + second_append.body_json()["output_text"].as_str(), + Some("assistant message 2") + ); + + let completion = completions + .into_iter() + .next() + .expect("missing delegated turn completion"); + completion + .await + .expect("delegated turn request did not complete"); + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + + realtime_server.shutdown().await; + api_server.shutdown().await; + Ok(()) +} + fn sse_event(event: Value) -> String { responses::sse(vec![event]) } @@ -567,7 +840,7 @@ fn message_input_texts(body: &Value, role: &str) -> Vec { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn inbound_realtime_text_starts_turn_for_assistant_role() -> Result<()> { +async fn inbound_handoff_request_starts_turn() -> Result<()> { skip_if_no_network!(Ok(())); let api_server = start_mock_server().await; @@ -583,16 +856,15 @@ async fn inbound_realtime_text_starts_turn_for_assistant_role() -> Result<()> { let realtime_server = start_websocket_server(vec![vec![vec![ json!({ - "type": "session.created", - "session": { "id": "sess_inbound" } + "type": "session.updated", + "session": { "id": "sess_inbound", "instructions": "backend prompt" } }), json!({ - "type": "conversation.item.added", - "item": { - "type": "message", - "role": "assistant", - "content": [{"type": "text", "text": "text from realtime"}] - } + "type": "conversation.handoff.requested", + "handoff_id": "handoff_inbound", + "item_id": "item_inbound", + "input_transcript": "text from realtime", + "messages": [{ "role": "user", "text": "text from realtime" }] }), ]]]) .await; @@ -612,14 +884,26 @@ async fn inbound_realtime_text_starts_turn_for_assistant_role() -> Result<()> { })) .await?; - let session_created = wait_for_event_match(&test.codex, |msg| match msg { + let session_updated = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, + payload: RealtimeEvent::SessionUpdated { session_id, .. }, }) => Some(session_id.clone()), _ => None, }) .await; - assert_eq!(session_created, "sess_inbound"); + assert_eq!(session_updated, "sess_inbound"); + + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::HandoffRequested(handoff), + }) if handoff.handoff_id == "handoff_inbound" + && handoff.input_transcript == "text from realtime" => + { + Some(()) + } + _ => None, + }) + .await; wait_for_event(&test.codex, |event| { matches!(event, EventMsg::TurnComplete(_)) @@ -635,15 +919,15 @@ async fn inbound_realtime_text_starts_turn_for_assistant_role() -> Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> Result<()> { +async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio() -> Result<()> { skip_if_no_network!(Ok(())); let api_server = start_mock_server().await; let realtime_server = start_websocket_server(vec![vec![vec![ json!({ - "type": "session.created", - "session": { "id": "sess_ignore_user_role" } + "type": "session.updated", + "session": { "id": "sess_ignore_item", "instructions": "backend prompt" } }), json!({ "type": "conversation.item.added", @@ -654,10 +938,10 @@ async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> R } }), json!({ - "type": "response.output_audio.delta", + "type": "conversation.output_audio.delta", "delta": "AQID", "sample_rate": 24000, - "num_channels": 1 + "channels": 1 }), ]]]) .await; @@ -679,8 +963,8 @@ async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> R let _ = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, - }) if session_id == "sess_ignore_user_role" => Some(()), + payload: RealtimeEvent::SessionUpdated { session_id, .. }, + }) if session_id == "sess_ignore_item" => Some(()), _ => None, }) .await; @@ -695,7 +979,7 @@ async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> R }), ) .await - .expect("timed out waiting for realtime audio after user-role conversation item"); + .expect("timed out waiting for realtime audio after conversation item"); assert_eq!(audio_out.data, "AQID"); let unexpected_turn_started = tokio::time::timeout( @@ -741,16 +1025,15 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au let realtime_server = start_websocket_server(vec![vec![ vec![ json!({ - "type": "session.created", - "session": { "id": "sess_echo_guard" } + "type": "session.updated", + "session": { "id": "sess_echo_guard", "instructions": "backend prompt" } }), json!({ - "type": "conversation.item.added", - "item": { - "type": "message", - "role": "assistant", - "content": [{"type": "text", "text": "delegate now"}] - } + "type": "conversation.handoff.requested", + "handoff_id": "handoff_echo_guard", + "item_id": "item_echo_guard", + "input_transcript": "delegate now", + "messages": [{"role": "user", "text": "delegate now"}] }), ], vec![ @@ -763,10 +1046,10 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au } }), json!({ - "type": "response.output_audio.delta", + "type": "conversation.output_audio.delta", "delta": "AQID", "sample_rate": 24000, - "num_channels": 1 + "channels": 1 }), ], ]]) @@ -789,7 +1072,7 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au let _ = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, + payload: RealtimeEvent::SessionUpdated { session_id, .. }, }) if session_id == "sess_echo_guard" => Some(()), _ => None, }) @@ -797,14 +1080,8 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au let _ = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::ConversationItemAdded(item), - }) => item - .get("content") - .and_then(Value::as_array) - .into_iter() - .flatten() - .any(|content| content.get("text").and_then(Value::as_str) == Some("delegate now")) - .then_some(()), + payload: RealtimeEvent::HandoffRequested(handoff), + }) if handoff.input_transcript == "delegate now" => Some(()), _ => None, }) .await; @@ -817,19 +1094,22 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au let mirrored_request = realtime_server.wait_for_request(0, 1).await; let mirrored_request_body = mirrored_request.body_json(); eprintln!( - "[realtime test +{}ms] saw mirrored request type={:?} role={:?} text={:?} data={:?}", + "[realtime test +{}ms] saw mirrored request type={:?} handoff_id={:?} text={:?}", start.elapsed().as_millis(), mirrored_request_body["type"].as_str(), - mirrored_request_body["item"]["role"].as_str(), - mirrored_request_body["item"]["content"][0]["text"].as_str(), - mirrored_request_body["item"]["content"][0]["data"].as_str(), + mirrored_request_body["handoff_id"].as_str(), + mirrored_request_body["output_text"].as_str(), ); assert_eq!( mirrored_request_body["type"].as_str(), - Some("conversation.item.create") + Some("conversation.handoff.append") ); assert_eq!( - mirrored_request_body["item"]["content"][0]["text"].as_str(), + mirrored_request_body["handoff_id"].as_str(), + Some("handoff_echo_guard") + ); + assert_eq!( + mirrored_request_body["output_text"].as_str(), Some("assistant says hi") ); @@ -875,7 +1155,7 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn inbound_realtime_text_does_not_block_realtime_event_forwarding() -> Result<()> { +async fn inbound_handoff_request_does_not_block_realtime_event_forwarding() -> Result<()> { skip_if_no_network!(Ok(())); let (gate_completed_tx, gate_completed_rx) = oneshot::channel(); @@ -893,22 +1173,21 @@ async fn inbound_realtime_text_does_not_block_realtime_event_forwarding() -> Res let realtime_server = start_websocket_server(vec![vec![vec![ json!({ - "type": "session.created", - "session": { "id": "sess_non_blocking" } + "type": "session.updated", + "session": { "id": "sess_non_blocking", "instructions": "backend prompt" } }), json!({ - "type": "conversation.item.added", - "item": { - "type": "message", - "role": "assistant", - "content": [{"type": "text", "text": "delegate now"}] - } + "type": "conversation.handoff.requested", + "handoff_id": "handoff_non_blocking", + "item_id": "item_non_blocking", + "input_transcript": "delegate now", + "messages": [{"role": "user", "text": "delegate now"}] }), json!({ - "type": "response.output_audio.delta", + "type": "conversation.output_audio.delta", "delta": "AQID", "sample_rate": 24000, - "num_channels": 1 + "channels": 1 }), ]]]) .await; @@ -930,7 +1209,7 @@ async fn inbound_realtime_text_does_not_block_realtime_event_forwarding() -> Res let _ = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, + payload: RealtimeEvent::SessionUpdated { session_id, .. }, }) if session_id == "sess_non_blocking" => Some(()), _ => None, }) @@ -938,14 +1217,8 @@ async fn inbound_realtime_text_does_not_block_realtime_event_forwarding() -> Res let _ = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::ConversationItemAdded(item), - }) => item - .get("content") - .and_then(Value::as_array) - .into_iter() - .flatten() - .any(|content| content.get("text").and_then(Value::as_str) == Some("delegate now")) - .then_some(()), + payload: RealtimeEvent::HandoffRequested(handoff), + }) if handoff.input_transcript == "delegate now" => Some(()), _ => None, }) .await; @@ -982,7 +1255,7 @@ async fn inbound_realtime_text_does_not_block_realtime_event_forwarding() -> Res } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn inbound_realtime_text_steers_active_turn() -> Result<()> { +async fn inbound_handoff_request_steers_active_turn() -> Result<()> { skip_if_no_network!(Ok(())); let (gate_completed_tx, gate_completed_rx) = oneshot::channel(); @@ -1027,17 +1300,15 @@ async fn inbound_realtime_text_steers_active_turn() -> Result<()> { let realtime_server = start_websocket_server(vec![vec![ vec![json!({ - "type": "session.created", - "session": { "id": "sess_steer" } + "type": "session.updated", + "session": { "id": "sess_steer", "instructions": "backend prompt" } })], - vec![], vec![json!({ - "type": "conversation.item.added", - "item": { - "type": "message", - "role": "assistant", - "content": [{"type": "text", "text": "steer via realtime"}] - } + "type": "conversation.handoff.requested", + "handoff_id": "handoff_steer", + "item_id": "item_steer", + "input_transcript": "steer via realtime", + "messages": [{ "role": "user", "text": "steer via realtime" }] })], ]]) .await; @@ -1058,7 +1329,7 @@ async fn inbound_realtime_text_steers_active_turn() -> Result<()> { .await?; let _ = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, + payload: RealtimeEvent::SessionUpdated { session_id, .. }, }) if session_id == "sess_steer" => Some(()), _ => None, }) @@ -1092,16 +1363,8 @@ async fn inbound_realtime_text_steers_active_turn() -> Result<()> { let _ = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::ConversationItemAdded(item), - }) => item - .get("content") - .and_then(Value::as_array) - .into_iter() - .flatten() - .any(|content| { - content.get("text").and_then(Value::as_str) == Some("steer via realtime") - }) - .then_some(()), + payload: RealtimeEvent::HandoffRequested(handoff), + }) if handoff.input_transcript == "steer via realtime" => Some(()), _ => None, }) .await; @@ -1141,7 +1404,7 @@ async fn inbound_realtime_text_steers_active_turn() -> Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn inbound_spawn_transcript_starts_turn_and_does_not_block_realtime_audio() -> Result<()> { +async fn inbound_handoff_request_starts_turn_and_does_not_block_realtime_audio() -> Result<()> { skip_if_no_network!(Ok(())); let (gate_completed_tx, gate_completed_rx) = oneshot::channel(); @@ -1157,33 +1420,24 @@ async fn inbound_spawn_transcript_starts_turn_and_does_not_block_realtime_audio( ]; let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await; - let delegated_text = "delegate from spawn transcript"; + let delegated_text = "delegate from handoff request"; let realtime_server = start_websocket_server(vec![vec![vec![ json!({ - "type": "session.created", - "session": { "id": "sess_spawn_transcript" } + "type": "session.updated", + "session": { "id": "sess_handoff_request", "instructions": "backend prompt" } }), json!({ - "type": "conversation.item.added", - "item": { - "type": "spawn_transcript", - "seq": 1, - "full_user_transcript": delegated_text, - "delta_user_transcript": delegated_text, - "backend_prompt_messages": [{ - "role": "user", - "channel": null, - "content": delegated_text, - "content_type": "text" - }], - "transcript_source": "backend_prompt_messages" - } + "type": "conversation.handoff.requested", + "handoff_id": "handoff_audio", + "item_id": "item_audio", + "input_transcript": delegated_text, + "messages": [{ "role": "user", "text": delegated_text }] }), json!({ - "type": "response.output_audio.delta", + "type": "conversation.output_audio.delta", "delta": "AQID", "sample_rate": 24000, - "num_channels": 1 + "channels": 1 }), ]]]) .await; @@ -1205,18 +1459,17 @@ async fn inbound_spawn_transcript_starts_turn_and_does_not_block_realtime_audio( let _ = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::SessionCreated { session_id }, - }) if session_id == "sess_spawn_transcript" => Some(()), + payload: RealtimeEvent::SessionUpdated { session_id, .. }, + }) if session_id == "sess_handoff_request" => Some(()), _ => None, }) .await; let _ = wait_for_event_match(&test.codex, |msg| match msg { EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::ConversationItemAdded(item), - }) => (item.get("type").and_then(Value::as_str) == Some("spawn_transcript") - && item.get("delta_user_transcript").and_then(Value::as_str) == Some(delegated_text)) - .then_some(()), + payload: RealtimeEvent::HandoffRequested(handoff), + }) => (handoff.handoff_id == "handoff_audio" && handoff.input_transcript == delegated_text) + .then_some(()), _ => None, }) .await; @@ -1231,7 +1484,7 @@ async fn inbound_spawn_transcript_starts_turn_and_does_not_block_realtime_audio( }), ) .await - .expect("timed out waiting for realtime audio after spawn_transcript"); + .expect("timed out waiting for realtime audio after handoff request"); assert_eq!(audio_out.data, "AQID"); let completion = completions diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 7e10951cd..671b734fa 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -116,12 +116,32 @@ pub struct RealtimeAudioFrame { pub samples_per_channel: Option, } +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +pub struct RealtimeHandoffMessage { + pub role: String, + pub text: String, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +pub struct RealtimeHandoffRequested { + pub handoff_id: String, + pub item_id: String, + pub input_transcript: String, + pub messages: Vec, +} + #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] pub enum RealtimeEvent { - SessionCreated { session_id: String }, - SessionUpdated { backend_prompt: Option }, + SessionUpdated { + session_id: String, + instructions: Option, + }, AudioOut(RealtimeAudioFrame), ConversationItemAdded(Value), + ConversationItemDone { + item_id: String, + }, + HandoffRequested(RealtimeHandoffRequested), Error(String), } diff --git a/codex-rs/tui/src/chatwidget/realtime.rs b/codex-rs/tui/src/chatwidget/realtime.rs index 98ac76a34..2970ac4d3 100644 --- a/codex-rs/tui/src/chatwidget/realtime.rs +++ b/codex-rs/tui/src/chatwidget/realtime.rs @@ -183,12 +183,13 @@ impl ChatWidget { ev: RealtimeConversationRealtimeEvent, ) { match ev.payload { - RealtimeEvent::SessionCreated { session_id } => { + RealtimeEvent::SessionUpdated { session_id, .. } => { self.realtime_conversation.session_id = Some(session_id); } - RealtimeEvent::SessionUpdated { .. } => {} RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame), RealtimeEvent::ConversationItemAdded(_item) => {} + RealtimeEvent::ConversationItemDone { .. } => {} + RealtimeEvent::HandoffRequested(_) => {} RealtimeEvent::Error(message) => { self.add_error_message(format!("Realtime voice error: {message}")); self.reset_realtime_conversation_state();