From 10a3adad8ee8d2cc5a22d0d85622d9ea84d2989f Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Mon, 23 Feb 2026 14:39:07 -0800 Subject: [PATCH] Handle realtime spawn_transcript delegation (#12619) --- codex-rs/core/src/realtime_conversation.rs | 62 ++- .../core/tests/suite/realtime_conversation.rs | 437 +++++++++++++++++- 2 files changed, 481 insertions(+), 18 deletions(-) diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 7187214fb..47a4354e2 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -217,7 +217,8 @@ pub(crate) async fn handle_start( _ => None, }; if let Some(text) = maybe_routed_text { - sess_clone.route_realtime_text_input(text).await; + let sess_for_routed_text = Arc::clone(&sess_clone); + sess_for_routed_text.route_realtime_text_input(text).await; } sess_clone .send_event_raw(ev(EventMsg::RealtimeConversationRealtime( @@ -252,16 +253,25 @@ pub(crate) async fn handle_audio( } fn realtime_text_from_conversation_item(item: &Value) -> Option { - if item.get("type").and_then(Value::as_str) != Some("message") { - return None; + match item.get("type").and_then(Value::as_str) { + Some("message") => { + if item.get("role").and_then(Value::as_str) != Some("assistant") { + return None; + } + let content = item.get("content")?.as_array()?; + let text = content + .iter() + .filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text")) + .filter_map(|entry| entry.get("text").and_then(Value::as_str)) + .collect::(); + if text.is_empty() { None } else { Some(text) } + } + Some("spawn_transcript") => item + .get("delta_user_transcript") + .and_then(Value::as_str) + .and_then(|text| (!text.is_empty()).then(|| text.to_string())), + Some(_) | None => None, } - let content = item.get("content")?.as_array()?; - let text = content - .iter() - .filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text")) - .filter_map(|entry| entry.get("text").and_then(Value::as_str)) - .collect::(); - if text.is_empty() { None } else { Some(text) } } pub(crate) async fn handle_text( @@ -301,7 +311,6 @@ fn spawn_realtime_input_task( tokio::spawn(async move { loop { tokio::select! { - biased; text = text_rx.recv() => { match text { Ok(text) => { @@ -388,7 +397,7 @@ mod tests { use serde_json::json; #[test] - fn extracts_text_from_message_items_ignoring_role() { + fn extracts_text_from_assistant_message_items_only() { let assistant = json!({ "type": "message", "role": "assistant", @@ -404,16 +413,14 @@ mod tests { "role": "user", "content": [{"type": "text", "text": "world"}], }); - assert_eq!( - realtime_text_from_conversation_item(&user), - Some("world".to_string()) - ); + assert_eq!(realtime_text_from_conversation_item(&user), None); } #[test] fn extracts_and_concatenates_text_entries_only() { let item = json!({ "type": "message", + "role": "assistant", "content": [ {"type": "text", "text": "a"}, {"type": "ignored", "text": "x"}, @@ -436,8 +443,31 @@ mod tests { let no_text = json!({ "type": "message", + "role": "assistant", "content": [{"type": "other", "value": 1}], }); assert_eq!(realtime_text_from_conversation_item(&no_text), None); + + let empty_spawn_transcript = json!({ + "type": "spawn_transcript", + "delta_user_transcript": "", + }); + assert_eq!( + realtime_text_from_conversation_item(&empty_spawn_transcript), + None + ); + } + + #[test] + fn extracts_text_from_spawn_transcript_items() { + let item = json!({ + "type": "spawn_transcript", + "delta_user_transcript": "delegate from transcript", + "backend_prompt_messages": [{"role": "user", "content": "delegate from transcript"}], + }); + assert_eq!( + realtime_text_from_conversation_item(&item), + Some("delegate from transcript".to_string()) + ); } } diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 59e40c016..98848e911 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -566,7 +566,7 @@ fn message_input_texts(body: &Value, role: &str) -> Vec { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn inbound_realtime_text_starts_turn_and_ignores_role() -> Result<()> { +async fn inbound_realtime_text_starts_turn_for_assistant_role() -> Result<()> { skip_if_no_network!(Ok(())); let api_server = start_mock_server().await; @@ -589,7 +589,7 @@ async fn inbound_realtime_text_starts_turn_and_ignores_role() -> Result<()> { "type": "conversation.item.added", "item": { "type": "message", - "role": "user", + "role": "assistant", "content": [{"type": "text", "text": "text from realtime"}] } }), @@ -633,6 +633,321 @@ async fn inbound_realtime_text_starts_turn_and_ignores_role() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> Result<()> { + skip_if_no_network!(Ok(())); + + let api_server = start_mock_server().await; + + let realtime_server = start_websocket_server(vec![vec![vec![ + json!({ + "type": "session.created", + "session": { "id": "sess_ignore_user_role" } + }), + json!({ + "type": "conversation.item.added", + "item": { + "type": "message", + "role": "user", + "content": [{"type": "text", "text": "echoed local text"}] + } + }), + json!({ + "type": "response.output_audio.delta", + "delta": "AQID", + "sample_rate": 24000, + "num_channels": 1 + }), + ]]]) + .await; + + let mut builder = test_codex().with_config({ + let realtime_base_url = realtime_server.uri().to_string(); + move |config| { + config.experimental_realtime_ws_base_url = Some(realtime_base_url); + } + }); + let test = builder.build(&api_server).await?; + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionCreated { session_id }, + }) if session_id == "sess_ignore_user_role" => Some(()), + _ => None, + }) + .await; + + let audio_out = tokio::time::timeout( + Duration::from_millis(500), + wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::AudioOut(frame), + }) => Some(frame.clone()), + _ => None, + }), + ) + .await + .expect("timed out waiting for realtime audio after user-role conversation item"); + assert_eq!(audio_out.data, "AQID"); + + let unexpected_turn_started = tokio::time::timeout( + Duration::from_millis(200), + wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::TurnStarted(_) => Some(()), + _ => None, + }), + ) + .await; + assert!(unexpected_turn_started.is_err()); + + realtime_server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_audio() -> Result<()> +{ + skip_if_no_network!(Ok(())); + + let (gate_completed_tx, gate_completed_rx) = oneshot::channel(); + let first_chunks = vec![ + StreamingSseChunk { + gate: None, + body: sse_event(responses::ev_response_created("resp-1")), + }, + StreamingSseChunk { + gate: None, + body: sse_event(responses::ev_assistant_message( + "msg-1", + "assistant says hi", + )), + }, + StreamingSseChunk { + gate: Some(gate_completed_rx), + body: sse_event(responses::ev_completed("resp-1")), + }, + ]; + let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await; + + let realtime_server = start_websocket_server(vec![vec![ + vec![ + json!({ + "type": "session.created", + "session": { "id": "sess_echo_guard" } + }), + json!({ + "type": "conversation.item.added", + "item": { + "type": "message", + "role": "assistant", + "content": [{"type": "text", "text": "delegate now"}] + } + }), + ], + vec![ + json!({ + "type": "conversation.item.added", + "item": { + "type": "message", + "role": "user", + "content": [{"type": "text", "text": "assistant says hi"}] + } + }), + json!({ + "type": "response.output_audio.delta", + "delta": "AQID", + "sample_rate": 24000, + "num_channels": 1 + }), + ], + ]]) + .await; + + let mut builder = test_codex().with_model("gpt-5.1").with_config({ + let realtime_base_url = realtime_server.uri().to_string(); + move |config| { + config.experimental_realtime_ws_base_url = Some(realtime_base_url); + } + }); + let test = builder.build_with_streaming_server(&api_server).await?; + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionCreated { session_id }, + }) if session_id == "sess_echo_guard" => Some(()), + _ => None, + }) + .await; + + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::ConversationItemAdded(item), + }) => item + .get("content") + .and_then(Value::as_array) + .into_iter() + .flatten() + .any(|content| content.get("text").and_then(Value::as_str) == Some("delegate now")) + .then_some(()), + _ => None, + }) + .await; + + let audio_out = tokio::time::timeout( + Duration::from_millis(500), + wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::AudioOut(frame), + }) => Some(frame.clone()), + _ => None, + }), + ) + .await + .expect("timed out waiting for realtime audio after echoed user-role message"); + assert_eq!(audio_out.data, "AQID"); + + let completion = completions + .into_iter() + .next() + .expect("missing delegated turn completion"); + let _ = gate_completed_tx.send(()); + completion + .await + .expect("delegated turn request did not complete"); + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + + let requests = api_server.requests().await; + assert_eq!(requests.len(), 1); + + realtime_server.shutdown().await; + api_server.shutdown().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn inbound_realtime_text_does_not_block_realtime_event_forwarding() -> Result<()> { + skip_if_no_network!(Ok(())); + + let (gate_completed_tx, gate_completed_rx) = oneshot::channel(); + let first_chunks = vec![ + StreamingSseChunk { + gate: None, + body: sse_event(responses::ev_response_created("resp-1")), + }, + StreamingSseChunk { + gate: Some(gate_completed_rx), + body: sse_event(responses::ev_completed("resp-1")), + }, + ]; + let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await; + + let realtime_server = start_websocket_server(vec![vec![vec![ + json!({ + "type": "session.created", + "session": { "id": "sess_non_blocking" } + }), + json!({ + "type": "conversation.item.added", + "item": { + "type": "message", + "role": "assistant", + "content": [{"type": "text", "text": "delegate now"}] + } + }), + json!({ + "type": "response.output_audio.delta", + "delta": "AQID", + "sample_rate": 24000, + "num_channels": 1 + }), + ]]]) + .await; + + let mut builder = test_codex().with_model("gpt-5.1").with_config({ + let realtime_base_url = realtime_server.uri().to_string(); + move |config| { + config.experimental_realtime_ws_base_url = Some(realtime_base_url); + } + }); + let test = builder.build_with_streaming_server(&api_server).await?; + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionCreated { session_id }, + }) if session_id == "sess_non_blocking" => Some(()), + _ => None, + }) + .await; + + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::ConversationItemAdded(item), + }) => item + .get("content") + .and_then(Value::as_array) + .into_iter() + .flatten() + .any(|content| content.get("text").and_then(Value::as_str) == Some("delegate now")) + .then_some(()), + _ => None, + }) + .await; + + let audio_out = tokio::time::timeout( + Duration::from_millis(500), + wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::AudioOut(frame), + }) => Some(frame.clone()), + _ => None, + }), + ) + .await + .expect("timed out waiting for realtime audio while delegated turn was still pending"); + assert_eq!(audio_out.data, "AQID"); + + let completion = completions + .into_iter() + .next() + .expect("missing delegated turn completion"); + let _ = gate_completed_tx.send(()); + completion + .await + .expect("delegated turn request did not complete"); + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + + realtime_server.shutdown().await; + api_server.shutdown().await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn inbound_realtime_text_steers_active_turn() -> Result<()> { skip_if_no_network!(Ok(())); @@ -791,3 +1106,121 @@ async fn inbound_realtime_text_steers_active_turn() -> Result<()> { api_server.shutdown().await; Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn inbound_spawn_transcript_starts_turn_and_does_not_block_realtime_audio() -> Result<()> { + skip_if_no_network!(Ok(())); + + let (gate_completed_tx, gate_completed_rx) = oneshot::channel(); + let first_chunks = vec![ + StreamingSseChunk { + gate: None, + body: sse_event(responses::ev_response_created("resp-1")), + }, + StreamingSseChunk { + gate: Some(gate_completed_rx), + body: sse_event(responses::ev_completed("resp-1")), + }, + ]; + let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await; + + let delegated_text = "delegate from spawn transcript"; + let realtime_server = start_websocket_server(vec![vec![vec![ + json!({ + "type": "session.created", + "session": { "id": "sess_spawn_transcript" } + }), + json!({ + "type": "conversation.item.added", + "item": { + "type": "spawn_transcript", + "seq": 1, + "full_user_transcript": delegated_text, + "delta_user_transcript": delegated_text, + "backend_prompt_messages": [{ + "role": "user", + "channel": null, + "content": delegated_text, + "content_type": "text" + }], + "transcript_source": "backend_prompt_messages" + } + }), + json!({ + "type": "response.output_audio.delta", + "delta": "AQID", + "sample_rate": 24000, + "num_channels": 1 + }), + ]]]) + .await; + + let mut builder = test_codex().with_model("gpt-5.1").with_config({ + let realtime_base_url = realtime_server.uri().to_string(); + move |config| { + config.experimental_realtime_ws_base_url = Some(realtime_base_url); + } + }); + let test = builder.build_with_streaming_server(&api_server).await?; + + test.codex + .submit(Op::RealtimeConversationStart(ConversationStartParams { + prompt: "backend prompt".to_string(), + session_id: None, + })) + .await?; + + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::SessionCreated { session_id }, + }) if session_id == "sess_spawn_transcript" => Some(()), + _ => None, + }) + .await; + + let _ = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::ConversationItemAdded(item), + }) => (item.get("type").and_then(Value::as_str) == Some("spawn_transcript") + && item.get("delta_user_transcript").and_then(Value::as_str) == Some(delegated_text)) + .then_some(()), + _ => None, + }) + .await; + + let audio_out = tokio::time::timeout( + Duration::from_millis(500), + wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::AudioOut(frame), + }) => Some(frame.clone()), + _ => None, + }), + ) + .await + .expect("timed out waiting for realtime audio after spawn_transcript"); + assert_eq!(audio_out.data, "AQID"); + + let completion = completions + .into_iter() + .next() + .expect("missing delegated turn completion"); + let _ = gate_completed_tx.send(()); + completion + .await + .expect("delegated turn request did not complete"); + wait_for_event(&test.codex, |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + + let requests = api_server.requests().await; + assert_eq!(requests.len(), 1); + let first_body: Value = serde_json::from_slice(&requests[0]).expect("parse first request"); + let first_texts = message_input_texts(&first_body, "user"); + assert!(first_texts.iter().any(|text| text == delegated_text)); + + realtime_server.shutdown().await; + api_server.shutdown().await; + Ok(()) +}