From 2e24be21346c5e2d415ab0ae10cd6dac854014d5 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 9 Mar 2026 22:30:03 -0700 Subject: [PATCH] Use realtime transcript for handoff context (#14132) - collect input/output transcript deltas into active handoff transcript state - attach and clear that transcript on each handoff, and regenerate schema/tests --- .../schema/json/EventMsg.json | 91 +++++--- .../codex_app_server_protocol.schemas.json | 91 +++++--- .../codex_app_server_protocol.v2.schemas.json | 91 +++++--- .../schema/typescript/RealtimeEvent.ts | 3 +- .../typescript/RealtimeHandoffRequested.ts | 4 +- ...fMessage.ts => RealtimeTranscriptDelta.ts} | 2 +- .../typescript/RealtimeTranscriptEntry.ts | 5 + .../schema/typescript/index.ts | 3 +- .../app-server/src/bespoke_event_handling.rs | 4 +- .../endpoint/realtime_websocket/methods.rs | 185 ++++++++++++++-- .../endpoint/realtime_websocket/protocol.rs | 25 ++- codex-rs/core/src/realtime_conversation.rs | 28 +-- .../core/tests/suite/realtime_conversation.rs | 207 +++++++++++++++--- codex-rs/protocol/src/protocol.rs | 11 +- codex-rs/tui/src/chatwidget/realtime.rs | 2 + 15 files changed, 596 insertions(+), 156 deletions(-) rename codex-rs/app-server-protocol/schema/typescript/{RealtimeHandoffMessage.ts => RealtimeTranscriptDelta.ts} (68%) create mode 100644 codex-rs/app-server-protocol/schema/typescript/RealtimeTranscriptEntry.ts diff --git a/codex-rs/app-server-protocol/schema/json/EventMsg.json b/codex-rs/app-server-protocol/schema/json/EventMsg.json index bda386119..845c5eb48 100644 --- a/codex-rs/app-server-protocol/schema/json/EventMsg.json +++ b/codex-rs/app-server-protocol/schema/json/EventMsg.json @@ -4671,6 +4671,32 @@ "title": "SessionUpdatedRealtimeEvent", "type": "object" }, + { + "additionalProperties": false, + "properties": { + "InputTranscriptDelta": { + "$ref": "#/definitions/RealtimeTranscriptDelta" + } + }, + "required": [ + "InputTranscriptDelta" + ], + "title": "InputTranscriptDeltaRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "OutputTranscriptDelta": { + "$ref": "#/definitions/RealtimeTranscriptDelta" + } + }, + "required": [ + "OutputTranscriptDelta" + ], + "title": "OutputTranscriptDeltaRealtimeEvent", + "type": "object" + }, { "additionalProperties": false, "properties": { @@ -4744,7 +4770,44 @@ } ] }, - "RealtimeHandoffMessage": { + "RealtimeHandoffRequested": { + "properties": { + "active_transcript": { + "items": { + "$ref": "#/definitions/RealtimeTranscriptEntry" + }, + "type": "array" + }, + "handoff_id": { + "type": "string" + }, + "input_transcript": { + "type": "string" + }, + "item_id": { + "type": "string" + } + }, + "required": [ + "active_transcript", + "handoff_id", + "input_transcript", + "item_id" + ], + "type": "object" + }, + "RealtimeTranscriptDelta": { + "properties": { + "delta": { + "type": "string" + } + }, + "required": [ + "delta" + ], + "type": "object" + }, + "RealtimeTranscriptEntry": { "properties": { "role": { "type": "string" @@ -4759,32 +4822,6 @@ ], "type": "object" }, - "RealtimeHandoffRequested": { - "properties": { - "handoff_id": { - "type": "string" - }, - "input_transcript": { - "type": "string" - }, - "item_id": { - "type": "string" - }, - "messages": { - "items": { - "$ref": "#/definitions/RealtimeHandoffMessage" - }, - "type": "array" - } - }, - "required": [ - "handoff_id", - "input_transcript", - "item_id", - "messages" - ], - "type": "object" - }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index beeb5a506..228a49d35 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -6684,6 +6684,32 @@ "title": "SessionUpdatedRealtimeEvent", "type": "object" }, + { + "additionalProperties": false, + "properties": { + "InputTranscriptDelta": { + "$ref": "#/definitions/RealtimeTranscriptDelta" + } + }, + "required": [ + "InputTranscriptDelta" + ], + "title": "InputTranscriptDeltaRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "OutputTranscriptDelta": { + "$ref": "#/definitions/RealtimeTranscriptDelta" + } + }, + "required": [ + "OutputTranscriptDelta" + ], + "title": "OutputTranscriptDeltaRealtimeEvent", + "type": "object" + }, { "additionalProperties": false, "properties": { @@ -6757,7 +6783,44 @@ } ] }, - "RealtimeHandoffMessage": { + "RealtimeHandoffRequested": { + "properties": { + "active_transcript": { + "items": { + "$ref": "#/definitions/RealtimeTranscriptEntry" + }, + "type": "array" + }, + "handoff_id": { + "type": "string" + }, + "input_transcript": { + "type": "string" + }, + "item_id": { + "type": "string" + } + }, + "required": [ + "active_transcript", + "handoff_id", + "input_transcript", + "item_id" + ], + "type": "object" + }, + "RealtimeTranscriptDelta": { + "properties": { + "delta": { + "type": "string" + } + }, + "required": [ + "delta" + ], + "type": "object" + }, + "RealtimeTranscriptEntry": { "properties": { "role": { "type": "string" @@ -6772,32 +6835,6 @@ ], "type": "object" }, - "RealtimeHandoffRequested": { - "properties": { - "handoff_id": { - "type": "string" - }, - "input_transcript": { - "type": "string" - }, - "item_id": { - "type": "string" - }, - "messages": { - "items": { - "$ref": "#/definitions/RealtimeHandoffMessage" - }, - "type": "array" - } - }, - "required": [ - "handoff_id", - "input_transcript", - "item_id", - "messages" - ], - "type": "object" - }, "RejectConfig": { "properties": { "mcp_elicitations": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 7c2d5961d..b5fdebe7d 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -9643,6 +9643,32 @@ "title": "SessionUpdatedRealtimeEvent", "type": "object" }, + { + "additionalProperties": false, + "properties": { + "InputTranscriptDelta": { + "$ref": "#/definitions/RealtimeTranscriptDelta" + } + }, + "required": [ + "InputTranscriptDelta" + ], + "title": "InputTranscriptDeltaRealtimeEvent", + "type": "object" + }, + { + "additionalProperties": false, + "properties": { + "OutputTranscriptDelta": { + "$ref": "#/definitions/RealtimeTranscriptDelta" + } + }, + "required": [ + "OutputTranscriptDelta" + ], + "title": "OutputTranscriptDeltaRealtimeEvent", + "type": "object" + }, { "additionalProperties": false, "properties": { @@ -9716,7 +9742,44 @@ } ] }, - "RealtimeHandoffMessage": { + "RealtimeHandoffRequested": { + "properties": { + "active_transcript": { + "items": { + "$ref": "#/definitions/RealtimeTranscriptEntry" + }, + "type": "array" + }, + "handoff_id": { + "type": "string" + }, + "input_transcript": { + "type": "string" + }, + "item_id": { + "type": "string" + } + }, + "required": [ + "active_transcript", + "handoff_id", + "input_transcript", + "item_id" + ], + "type": "object" + }, + "RealtimeTranscriptDelta": { + "properties": { + "delta": { + "type": "string" + } + }, + "required": [ + "delta" + ], + "type": "object" + }, + "RealtimeTranscriptEntry": { "properties": { "role": { "type": "string" @@ -9731,32 +9794,6 @@ ], "type": "object" }, - "RealtimeHandoffRequested": { - "properties": { - "handoff_id": { - "type": "string" - }, - "input_transcript": { - "type": "string" - }, - "item_id": { - "type": "string" - }, - "messages": { - "items": { - "$ref": "#/definitions/RealtimeHandoffMessage" - }, - "type": "array" - } - }, - "required": [ - "handoff_id", - "input_transcript", - "item_id", - "messages" - ], - "type": "object" - }, "ReasoningEffort": { "description": "See https://platform.openai.com/docs/guides/reasoning?api-mode=responses#get-started-with-reasoning", "enum": [ diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeEvent.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeEvent.ts index 8286f34ca..490400b4e 100644 --- a/codex-rs/app-server-protocol/schema/typescript/RealtimeEvent.ts +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeEvent.ts @@ -3,6 +3,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { RealtimeAudioFrame } from "./RealtimeAudioFrame"; import type { RealtimeHandoffRequested } from "./RealtimeHandoffRequested"; +import type { RealtimeTranscriptDelta } from "./RealtimeTranscriptDelta"; import type { JsonValue } from "./serde_json/JsonValue"; -export type RealtimeEvent = { "SessionUpdated": { session_id: string, instructions: string | null, } } | { "AudioOut": RealtimeAudioFrame } | { "ConversationItemAdded": JsonValue } | { "ConversationItemDone": { item_id: string, } } | { "HandoffRequested": RealtimeHandoffRequested } | { "Error": string }; +export type RealtimeEvent = { "SessionUpdated": { session_id: string, instructions: string | null, } } | { "InputTranscriptDelta": RealtimeTranscriptDelta } | { "OutputTranscriptDelta": RealtimeTranscriptDelta } | { "AudioOut": RealtimeAudioFrame } | { "ConversationItemAdded": JsonValue } | { "ConversationItemDone": { item_id: string, } } | { "HandoffRequested": RealtimeHandoffRequested } | { "Error": string }; diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffRequested.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffRequested.ts index f3426706a..5fbe23791 100644 --- a/codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffRequested.ts +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffRequested.ts @@ -1,6 +1,6 @@ // GENERATED CODE! DO NOT MODIFY BY HAND! // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -import type { RealtimeHandoffMessage } from "./RealtimeHandoffMessage"; +import type { RealtimeTranscriptEntry } from "./RealtimeTranscriptEntry"; -export type RealtimeHandoffRequested = { handoff_id: string, item_id: string, input_transcript: string, messages: Array, }; +export type RealtimeHandoffRequested = { handoff_id: string, item_id: string, input_transcript: string, active_transcript: Array, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffMessage.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeTranscriptDelta.ts similarity index 68% rename from codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffMessage.ts rename to codex-rs/app-server-protocol/schema/typescript/RealtimeTranscriptDelta.ts index 39b77cfdf..99cf24f77 100644 --- a/codex-rs/app-server-protocol/schema/typescript/RealtimeHandoffMessage.ts +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeTranscriptDelta.ts @@ -2,4 +2,4 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -export type RealtimeHandoffMessage = { role: string, text: string, }; +export type RealtimeTranscriptDelta = { delta: string, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/RealtimeTranscriptEntry.ts b/codex-rs/app-server-protocol/schema/typescript/RealtimeTranscriptEntry.ts new file mode 100644 index 000000000..e7420f3c7 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/RealtimeTranscriptEntry.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type RealtimeTranscriptEntry = { role: string, text: string, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/index.ts b/codex-rs/app-server-protocol/schema/typescript/index.ts index 6560ffd5d..a7b38b044 100644 --- a/codex-rs/app-server-protocol/schema/typescript/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/index.ts @@ -152,8 +152,9 @@ export type { RealtimeConversationClosedEvent } from "./RealtimeConversationClos export type { RealtimeConversationRealtimeEvent } from "./RealtimeConversationRealtimeEvent"; export type { RealtimeConversationStartedEvent } from "./RealtimeConversationStartedEvent"; export type { RealtimeEvent } from "./RealtimeEvent"; -export type { RealtimeHandoffMessage } from "./RealtimeHandoffMessage"; export type { RealtimeHandoffRequested } from "./RealtimeHandoffRequested"; +export type { RealtimeTranscriptDelta } from "./RealtimeTranscriptDelta"; +export type { RealtimeTranscriptEntry } from "./RealtimeTranscriptEntry"; export type { ReasoningContentDeltaEvent } from "./ReasoningContentDeltaEvent"; export type { ReasoningEffort } from "./ReasoningEffort"; export type { ReasoningItem } from "./ReasoningItem"; diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index d442f3eca..85ca56c91 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -275,6 +275,8 @@ pub(crate) async fn apply_bespoke_event_handling( if let ApiVersion::V2 = api_version { match event.payload { RealtimeEvent::SessionUpdated { .. } => {} + RealtimeEvent::InputTranscriptDelta(_) => {} + RealtimeEvent::OutputTranscriptDelta(_) => {} RealtimeEvent::AudioOut(audio) => { let notification = ThreadRealtimeOutputAudioDeltaNotification { thread_id: conversation_id.to_string(), @@ -306,7 +308,7 @@ pub(crate) async fn apply_bespoke_event_handling( "handoff_id": handoff.handoff_id, "item_id": handoff.item_id, "input_transcript": handoff.input_transcript, - "messages": handoff.messages, + "active_transcript": handoff.active_transcript, }), }; outgoing diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index bc9551135..60cb5d2c3 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -4,6 +4,8 @@ use crate::endpoint::realtime_websocket::protocol::RealtimeAudioFrame; use crate::endpoint::realtime_websocket::protocol::RealtimeEvent; use crate::endpoint::realtime_websocket::protocol::RealtimeOutboundMessage; use crate::endpoint::realtime_websocket::protocol::RealtimeSessionConfig; +use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta; +use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry; use crate::endpoint::realtime_websocket::protocol::SessionAudio; use crate::endpoint::realtime_websocket::protocol::SessionAudioFormat; use crate::endpoint::realtime_websocket::protocol::SessionAudioInput; @@ -198,9 +200,15 @@ pub struct RealtimeWebsocketWriter { #[derive(Clone)] pub struct RealtimeWebsocketEvents { rx_message: Arc>>>, + active_transcript: Arc>, is_closed: Arc, } +#[derive(Default)] +struct ActiveTranscriptState { + entries: Vec, +} + impl RealtimeWebsocketConnection { pub async fn send_audio_frame(&self, frame: RealtimeAudioFrame) -> Result<(), ApiError> { self.writer.send_audio_frame(frame).await @@ -249,6 +257,7 @@ impl RealtimeWebsocketConnection { }, events: RealtimeWebsocketEvents { rx_message: Arc::new(Mutex::new(rx_message)), + active_transcript: Arc::new(Mutex::new(ActiveTranscriptState::default())), is_closed, }, } @@ -366,7 +375,8 @@ impl RealtimeWebsocketEvents { match msg { Message::Text(text) => { - if let Some(event) = parse_realtime_event(&text) { + if let Some(mut event) = parse_realtime_event(&text) { + self.update_active_transcript(&mut event).await; debug!(?event, "realtime websocket parsed event"); return Ok(Some(event)); } @@ -390,6 +400,44 @@ impl RealtimeWebsocketEvents { } } } + + async fn update_active_transcript(&self, event: &mut RealtimeEvent) { + let mut active_transcript = self.active_transcript.lock().await; + match event { + RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta }) => { + append_transcript_delta(&mut active_transcript.entries, "user", delta); + } + RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta }) => { + append_transcript_delta(&mut active_transcript.entries, "assistant", delta); + } + RealtimeEvent::HandoffRequested(handoff) => { + handoff.active_transcript = std::mem::take(&mut active_transcript.entries); + } + RealtimeEvent::SessionUpdated { .. } + | RealtimeEvent::AudioOut(_) + | RealtimeEvent::ConversationItemAdded(_) + | RealtimeEvent::ConversationItemDone { .. } + | RealtimeEvent::Error(_) => {} + } + } +} + +fn append_transcript_delta(entries: &mut Vec, role: &str, delta: &str) { + if delta.is_empty() { + return; + } + + if let Some(last_entry) = entries.last_mut() + && last_entry.role == role + { + last_entry.text.push_str(delta); + return; + } + + entries.push(RealtimeTranscriptEntry { + role: role.to_string(), + text: delta.to_string(), + }); } pub struct RealtimeWebsocketClient { @@ -558,8 +606,9 @@ fn normalize_realtime_path(url: &mut Url) { #[cfg(test)] mod tests { use super::*; - use crate::endpoint::realtime_websocket::protocol::RealtimeHandoffMessage; use crate::endpoint::realtime_websocket::protocol::RealtimeHandoffRequested; + use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptDelta; + use crate::endpoint::realtime_websocket::protocol::RealtimeTranscriptEntry; use http::HeaderValue; use pretty_assertions::assert_eq; use serde_json::Value; @@ -644,10 +693,7 @@ mod tests { "type": "conversation.handoff.requested", "handoff_id": "handoff_123", "item_id": "item_123", - "input_transcript": "delegate this", - "messages": [ - {"role": "user", "text": "delegate this"} - ] + "input_transcript": "delegate this" }) .to_string(); @@ -657,14 +703,47 @@ mod tests { handoff_id: "handoff_123".to_string(), item_id: "item_123".to_string(), input_transcript: "delegate this".to_string(), - messages: vec![RealtimeHandoffMessage { - role: "user".to_string(), - text: "delegate this".to_string(), - }], + active_transcript: Vec::new(), })) ); } + #[test] + fn parse_input_transcript_delta_event() { + let payload = json!({ + "type": "conversation.input_transcript.delta", + "delta": "hello " + }) + .to_string(); + + assert_eq!( + parse_realtime_event(payload.as_str()), + Some(RealtimeEvent::InputTranscriptDelta( + RealtimeTranscriptDelta { + delta: "hello ".to_string(), + } + )) + ); + } + + #[test] + fn parse_output_transcript_delta_event() { + let payload = json!({ + "type": "conversation.output_transcript.delta", + "delta": "hi" + }) + .to_string(); + + assert_eq!( + parse_realtime_event(payload.as_str()), + Some(RealtimeEvent::OutputTranscriptDelta( + RealtimeTranscriptDelta { + delta: "hi".to_string(), + } + )) + ); + } + #[test] fn merge_request_headers_matches_http_precedence() { let mut provider_headers = HeaderMap::new(); @@ -853,13 +932,45 @@ mod tests { .await .expect("send audio"); + ws.send(Message::Text( + json!({ + "type": "conversation.input_transcript.delta", + "delta": "delegate " + }) + .to_string() + .into(), + )) + .await + .expect("send input transcript delta"); + + ws.send(Message::Text( + json!({ + "type": "conversation.input_transcript.delta", + "delta": "now" + }) + .to_string() + .into(), + )) + .await + .expect("send input transcript delta"); + + ws.send(Message::Text( + json!({ + "type": "conversation.output_transcript.delta", + "delta": "working" + }) + .to_string() + .into(), + )) + .await + .expect("send output transcript delta"); + ws.send(Message::Text( json!({ "type": "conversation.handoff.requested", "handoff_id": "handoff_1", "item_id": "item_2", - "input_transcript": "delegate now", - "messages": [{"role": "user", "text": "delegate now"}] + "input_transcript": "delegate now" }) .to_string() .into(), @@ -945,6 +1056,42 @@ mod tests { }) ); + let input_delta_event = connection + .next_event() + .await + .expect("next event") + .expect("event"); + assert_eq!( + input_delta_event, + RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { + delta: "delegate ".to_string(), + }) + ); + + let input_delta_event = connection + .next_event() + .await + .expect("next event") + .expect("event"); + assert_eq!( + input_delta_event, + RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { + delta: "now".to_string(), + }) + ); + + let output_delta_event = connection + .next_event() + .await + .expect("next event") + .expect("event"); + assert_eq!( + output_delta_event, + RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { + delta: "working".to_string(), + }) + ); + let added_event = connection .next_event() .await @@ -956,10 +1103,16 @@ mod tests { handoff_id: "handoff_1".to_string(), item_id: "item_2".to_string(), input_transcript: "delegate now".to_string(), - messages: vec![RealtimeHandoffMessage { - role: "user".to_string(), - text: "delegate now".to_string(), - }], + active_transcript: vec![ + RealtimeTranscriptEntry { + role: "user".to_string(), + text: "delegate now".to_string(), + }, + RealtimeTranscriptEntry { + role: "assistant".to_string(), + text: "working".to_string(), + }, + ], }) ); diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs index 82957c5ec..7967d5999 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/protocol.rs @@ -1,7 +1,8 @@ pub use codex_protocol::protocol::RealtimeAudioFrame; pub use codex_protocol::protocol::RealtimeEvent; -pub use codex_protocol::protocol::RealtimeHandoffMessage; pub use codex_protocol::protocol::RealtimeHandoffRequested; +pub use codex_protocol::protocol::RealtimeTranscriptDelta; +pub use codex_protocol::protocol::RealtimeTranscriptEntry; use serde::Serialize; use serde_json::Value; use tracing::debug; @@ -135,6 +136,16 @@ pub(super) fn parse_realtime_event(payload: &str) -> Option { .and_then(|v| u32::try_from(v).ok()), })) } + "conversation.input_transcript.delta" => parsed + .get("delta") + .and_then(Value::as_str) + .map(str::to_string) + .map(|delta| RealtimeEvent::InputTranscriptDelta(RealtimeTranscriptDelta { delta })), + "conversation.output_transcript.delta" => parsed + .get("delta") + .and_then(Value::as_str) + .map(str::to_string) + .map(|delta| RealtimeEvent::OutputTranscriptDelta(RealtimeTranscriptDelta { delta })), "conversation.item.added" => parsed .get("item") .cloned() @@ -159,21 +170,11 @@ pub(super) fn parse_realtime_event(payload: &str) -> Option { .get("input_transcript") .and_then(Value::as_str) .map(str::to_string)?; - let messages = parsed - .get("messages") - .and_then(Value::as_array)? - .iter() - .filter_map(|message| { - let role = message.get("role").and_then(Value::as_str)?.to_string(); - let text = message.get("text").and_then(Value::as_str)?.to_string(); - Some(RealtimeHandoffMessage { role, text }) - }) - .collect(); Some(RealtimeEvent::HandoffRequested(RealtimeHandoffRequested { handoff_id, item_id, input_transcript, - messages, + active_transcript: Vec::new(), })) } "error" => parsed diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 8d69b6b9e..3baea265a 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -394,15 +394,17 @@ pub(crate) async fn handle_audio( } fn realtime_text_from_handoff_request(handoff: &RealtimeHandoffRequested) -> Option { - let messages = handoff - .messages + let active_transcript = handoff + .active_transcript .iter() - .map(|message| format!("{}: {}", message.role, message.text)) + .map(|entry| format!("{}: {}", entry.role, entry.text)) .collect::>() .join("\n"); - (!messages.is_empty()).then_some(messages).or_else(|| { - (!handoff.input_transcript.is_empty()).then(|| handoff.input_transcript.clone()) - }) + (!active_transcript.is_empty()) + .then_some(active_transcript) + .or_else(|| { + (!handoff.input_transcript.is_empty()).then(|| handoff.input_transcript.clone()) + }) } fn realtime_api_key( @@ -603,22 +605,22 @@ mod tests { use super::RealtimeHandoffState; use super::realtime_text_from_handoff_request; use async_channel::bounded; - use codex_protocol::protocol::RealtimeHandoffMessage; use codex_protocol::protocol::RealtimeHandoffRequested; + use codex_protocol::protocol::RealtimeTranscriptEntry; use pretty_assertions::assert_eq; #[test] - fn extracts_text_from_handoff_request_messages() { + fn extracts_text_from_handoff_request_active_transcript() { let handoff = RealtimeHandoffRequested { handoff_id: "handoff_1".to_string(), item_id: "item_1".to_string(), input_transcript: "ignored".to_string(), - messages: vec![ - RealtimeHandoffMessage { + active_transcript: vec![ + RealtimeTranscriptEntry { role: "user".to_string(), text: "hello".to_string(), }, - RealtimeHandoffMessage { + RealtimeTranscriptEntry { role: "assistant".to_string(), text: "hi there".to_string(), }, @@ -636,7 +638,7 @@ mod tests { handoff_id: "handoff_1".to_string(), item_id: "item_1".to_string(), input_transcript: "ignored".to_string(), - messages: vec![], + active_transcript: vec![], }; assert_eq!( realtime_text_from_handoff_request(&handoff), @@ -650,7 +652,7 @@ mod tests { handoff_id: "handoff_1".to_string(), item_id: "item_1".to_string(), input_transcript: String::new(), - messages: vec![], + active_transcript: vec![], }; assert_eq!(realtime_text_from_handoff_request(&handoff), None); } diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 6313616e3..0d49f8c8d 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -970,12 +970,15 @@ async fn conversation_mirrors_assistant_message_text_to_realtime_handoff() -> Re "type": "session.updated", "session": { "id": "sess_1", "instructions": "backend prompt" } }), + json!({ + "type": "conversation.input_transcript.delta", + "delta": "delegate hello" + }), json!({ "type": "conversation.handoff.requested", "handoff_id": "handoff_1", "item_id": "item_1", - "input_transcript": "delegate hello", - "messages": [{ "role": "user", "text": "delegate hello" }] + "input_transcript": "delegate hello" }), ], vec![], @@ -1089,12 +1092,15 @@ async fn conversation_handoff_persists_across_item_done_until_turn_complete() -> "type": "session.updated", "session": { "id": "sess_item_done", "instructions": "backend prompt" } }), + json!({ + "type": "conversation.input_transcript.delta", + "delta": "delegate now" + }), json!({ "type": "conversation.handoff.requested", "handoff_id": "handoff_item_done", "item_id": "item_item_done", - "input_transcript": "delegate now", - "messages": [{ "role": "user", "text": "delegate now" }] + "input_transcript": "delegate now" }), ], vec![json!({ @@ -1229,12 +1235,15 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> { "type": "session.updated", "session": { "id": "sess_inbound", "instructions": "backend prompt" } }), + json!({ + "type": "conversation.input_transcript.delta", + "delta": "text from realtime" + }), json!({ "type": "conversation.handoff.requested", "handoff_id": "handoff_inbound", "item_id": "item_inbound", - "input_transcript": "text from realtime", - "messages": [{ "role": "user", "text": "text from realtime" }] + "input_transcript": "text from realtime" }), ]]]) .await; @@ -1293,7 +1302,7 @@ async fn inbound_handoff_request_starts_turn() -> Result<()> { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn inbound_handoff_request_uses_all_messages() -> Result<()> { +async fn inbound_handoff_request_uses_active_transcript() -> Result<()> { skip_if_no_network!(Ok(())); let api_server = start_mock_server().await; @@ -1312,16 +1321,23 @@ async fn inbound_handoff_request_uses_all_messages() -> Result<()> { "type": "session.updated", "session": { "id": "sess_inbound_multi", "instructions": "backend prompt" } }), + json!({ + "type": "conversation.output_transcript.delta", + "delta": "assistant context" + }), + json!({ + "type": "conversation.input_transcript.delta", + "delta": "delegated query" + }), + json!({ + "type": "conversation.output_transcript.delta", + "delta": "assist confirm" + }), json!({ "type": "conversation.handoff.requested", "handoff_id": "handoff_inbound_multi", "item_id": "item_inbound_multi", - "input_transcript": "ignored", - "messages": [ - { "role": "assistant", "text": "assistant context" }, - { "role": "user", "text": "delegated query" }, - { "role": "assistant", "text": "assist confirm" }, - ] + "input_transcript": "ignored" }), ]]]) .await; @@ -1363,6 +1379,131 @@ async fn inbound_handoff_request_uses_all_messages() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn inbound_handoff_request_clears_active_transcript_after_each_handoff() -> Result<()> { + skip_if_no_network!(Ok(())); + + let api_server = start_mock_server().await; + let response_mock = responses::mount_sse_sequence( + &api_server, + vec![ + responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "first ok"), + responses::ev_completed("resp-1"), + ]), + responses::sse(vec![ + responses::ev_response_created("resp-2"), + responses::ev_assistant_message("msg-2", "second ok"), + responses::ev_completed("resp-2"), + ]), + ], + ) + .await; + + let realtime_server = start_websocket_server(vec![vec![ + vec![ + json!({ + "type": "session.updated", + "session": { "id": "sess_inbound_clear", "instructions": "backend prompt" } + }), + json!({ + "type": "conversation.input_transcript.delta", + "delta": "first question" + }), + json!({ + "type": "conversation.handoff.requested", + "handoff_id": "handoff_inbound_clear_1", + "item_id": "item_inbound_clear_1", + "input_transcript": "first question" + }), + ], + vec![], + vec![ + json!({ + "type": "conversation.input_transcript.delta", + "delta": "second question" + }), + json!({ + "type": "conversation.handoff.requested", + "handoff_id": "handoff_inbound_clear_2", + "item_id": "item_inbound_clear_2", + "input_transcript": "second question" + }), + ], + ]]) + .await; + + let mut builder = test_codex().with_config({ + let realtime_base_url = realtime_server.uri().to_string(); + move |config| { + config.experimental_realtime_ws_base_url = Some(realtime_base_url); + } + }); + let test = builder.build(&api_server).await?; + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionUpdated { session_id, .. }, + }) => Some(session_id.clone()), + _ => None, + }) + .await; + + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + + test.codex + .submit(Op::RealtimeConversationAudio(ConversationAudioParams { + frame: RealtimeAudioFrame { + data: "AQID".to_string(), + sample_rate: 24000, + num_channels: 1, + samples_per_channel: Some(480), + }, + })) + .await?; + + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + + let requests = response_mock.requests(); + assert_eq!(requests.len(), 2); + + let first_user_texts = requests[0].message_input_texts("user"); + assert!( + first_user_texts + .iter() + .any(|text| text == "user: first question") + ); + + let second_user_texts = requests[1].message_input_texts("user"); + assert!( + second_user_texts + .iter() + .any(|text| text == "user: second question") + ); + assert!( + !second_user_texts + .iter() + .any(|text| text == "user: first question\nuser: second question") + ); + + realtime_server.shutdown().await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn inbound_conversation_item_does_not_start_turn_and_still_forwards_audio() -> Result<()> { skip_if_no_network!(Ok(())); @@ -1473,12 +1614,15 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au "type": "session.updated", "session": { "id": "sess_echo_guard", "instructions": "backend prompt" } }), + json!({ + "type": "conversation.input_transcript.delta", + "delta": "delegate now" + }), json!({ "type": "conversation.handoff.requested", "handoff_id": "handoff_echo_guard", "item_id": "item_echo_guard", - "input_transcript": "delegate now", - "messages": [{"role": "user", "text": "delegate now"}] + "input_transcript": "delegate now" }), ], vec![ @@ -1621,12 +1765,15 @@ async fn inbound_handoff_request_does_not_block_realtime_event_forwarding() -> R "type": "session.updated", "session": { "id": "sess_non_blocking", "instructions": "backend prompt" } }), + json!({ + "type": "conversation.input_transcript.delta", + "delta": "delegate now" + }), json!({ "type": "conversation.handoff.requested", "handoff_id": "handoff_non_blocking", "item_id": "item_non_blocking", - "input_transcript": "delegate now", - "messages": [{"role": "user", "text": "delegate now"}] + "input_transcript": "delegate now" }), json!({ "type": "conversation.output_audio.delta", @@ -1748,13 +1895,18 @@ async fn inbound_handoff_request_steers_active_turn() -> Result<()> { "type": "session.updated", "session": { "id": "sess_steer", "instructions": "backend prompt" } })], - vec![json!({ - "type": "conversation.handoff.requested", - "handoff_id": "handoff_steer", - "item_id": "item_steer", - "input_transcript": "steer via realtime", - "messages": [{ "role": "user", "text": "steer via realtime" }] - })], + vec![ + json!({ + "type": "conversation.input_transcript.delta", + "delta": "steer via realtime" + }), + json!({ + "type": "conversation.handoff.requested", + "handoff_id": "handoff_steer", + "item_id": "item_steer", + "input_transcript": "steer via realtime" + }), + ], ]]) .await; @@ -1879,12 +2031,15 @@ async fn inbound_handoff_request_starts_turn_and_does_not_block_realtime_audio() "type": "session.updated", "session": { "id": "sess_handoff_request", "instructions": "backend prompt" } }), + json!({ + "type": "conversation.input_transcript.delta", + "delta": delegated_text + }), json!({ "type": "conversation.handoff.requested", "handoff_id": "handoff_audio", "item_id": "item_audio", - "input_transcript": delegated_text, - "messages": [{ "role": "user", "text": delegated_text }] + "input_transcript": delegated_text }), json!({ "type": "conversation.output_audio.delta", diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 633e530d2..98ff8f7f5 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -132,7 +132,12 @@ pub struct RealtimeAudioFrame { } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] -pub struct RealtimeHandoffMessage { +pub struct RealtimeTranscriptDelta { + pub delta: String, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] +pub struct RealtimeTranscriptEntry { pub role: String, pub text: String, } @@ -142,7 +147,7 @@ pub struct RealtimeHandoffRequested { pub handoff_id: String, pub item_id: String, pub input_transcript: String, - pub messages: Vec, + pub active_transcript: Vec, } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq, JsonSchema, TS)] @@ -151,6 +156,8 @@ pub enum RealtimeEvent { session_id: String, instructions: Option, }, + InputTranscriptDelta(RealtimeTranscriptDelta), + OutputTranscriptDelta(RealtimeTranscriptDelta), AudioOut(RealtimeAudioFrame), ConversationItemAdded(Value), ConversationItemDone { diff --git a/codex-rs/tui/src/chatwidget/realtime.rs b/codex-rs/tui/src/chatwidget/realtime.rs index 32f0a1528..4e4f2f0e7 100644 --- a/codex-rs/tui/src/chatwidget/realtime.rs +++ b/codex-rs/tui/src/chatwidget/realtime.rs @@ -264,6 +264,8 @@ impl ChatWidget { RealtimeEvent::SessionUpdated { session_id, .. } => { self.realtime_conversation.session_id = Some(session_id); } + RealtimeEvent::InputTranscriptDelta(_) => {} + RealtimeEvent::OutputTranscriptDelta(_) => {} RealtimeEvent::AudioOut(frame) => self.enqueue_realtime_audio_out(&frame), RealtimeEvent::ConversationItemAdded(_item) => {} RealtimeEvent::ConversationItemDone { .. } => {}