Handle realtime spawn_transcript delegation (#12619)
This commit is contained in:
parent
855e275591
commit
10a3adad8e
2 changed files with 481 additions and 18 deletions
|
|
@ -217,7 +217,8 @@ pub(crate) async fn handle_start(
|
|||
_ => None,
|
||||
};
|
||||
if let Some(text) = maybe_routed_text {
|
||||
sess_clone.route_realtime_text_input(text).await;
|
||||
let sess_for_routed_text = Arc::clone(&sess_clone);
|
||||
sess_for_routed_text.route_realtime_text_input(text).await;
|
||||
}
|
||||
sess_clone
|
||||
.send_event_raw(ev(EventMsg::RealtimeConversationRealtime(
|
||||
|
|
@ -252,16 +253,25 @@ pub(crate) async fn handle_audio(
|
|||
}
|
||||
|
||||
fn realtime_text_from_conversation_item(item: &Value) -> Option<String> {
|
||||
if item.get("type").and_then(Value::as_str) != Some("message") {
|
||||
return None;
|
||||
match item.get("type").and_then(Value::as_str) {
|
||||
Some("message") => {
|
||||
if item.get("role").and_then(Value::as_str) != Some("assistant") {
|
||||
return None;
|
||||
}
|
||||
let content = item.get("content")?.as_array()?;
|
||||
let text = content
|
||||
.iter()
|
||||
.filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text"))
|
||||
.filter_map(|entry| entry.get("text").and_then(Value::as_str))
|
||||
.collect::<String>();
|
||||
if text.is_empty() { None } else { Some(text) }
|
||||
}
|
||||
Some("spawn_transcript") => item
|
||||
.get("delta_user_transcript")
|
||||
.and_then(Value::as_str)
|
||||
.and_then(|text| (!text.is_empty()).then(|| text.to_string())),
|
||||
Some(_) | None => None,
|
||||
}
|
||||
let content = item.get("content")?.as_array()?;
|
||||
let text = content
|
||||
.iter()
|
||||
.filter(|entry| entry.get("type").and_then(Value::as_str) == Some("text"))
|
||||
.filter_map(|entry| entry.get("text").and_then(Value::as_str))
|
||||
.collect::<String>();
|
||||
if text.is_empty() { None } else { Some(text) }
|
||||
}
|
||||
|
||||
pub(crate) async fn handle_text(
|
||||
|
|
@ -301,7 +311,6 @@ fn spawn_realtime_input_task(
|
|||
tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
text = text_rx.recv() => {
|
||||
match text {
|
||||
Ok(text) => {
|
||||
|
|
@ -388,7 +397,7 @@ mod tests {
|
|||
use serde_json::json;
|
||||
|
||||
#[test]
|
||||
fn extracts_text_from_message_items_ignoring_role() {
|
||||
fn extracts_text_from_assistant_message_items_only() {
|
||||
let assistant = json!({
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
|
|
@ -404,16 +413,14 @@ mod tests {
|
|||
"role": "user",
|
||||
"content": [{"type": "text", "text": "world"}],
|
||||
});
|
||||
assert_eq!(
|
||||
realtime_text_from_conversation_item(&user),
|
||||
Some("world".to_string())
|
||||
);
|
||||
assert_eq!(realtime_text_from_conversation_item(&user), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extracts_and_concatenates_text_entries_only() {
|
||||
let item = json!({
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [
|
||||
{"type": "text", "text": "a"},
|
||||
{"type": "ignored", "text": "x"},
|
||||
|
|
@ -436,8 +443,31 @@ mod tests {
|
|||
|
||||
let no_text = json!({
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "other", "value": 1}],
|
||||
});
|
||||
assert_eq!(realtime_text_from_conversation_item(&no_text), None);
|
||||
|
||||
let empty_spawn_transcript = json!({
|
||||
"type": "spawn_transcript",
|
||||
"delta_user_transcript": "",
|
||||
});
|
||||
assert_eq!(
|
||||
realtime_text_from_conversation_item(&empty_spawn_transcript),
|
||||
None
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn extracts_text_from_spawn_transcript_items() {
|
||||
let item = json!({
|
||||
"type": "spawn_transcript",
|
||||
"delta_user_transcript": "delegate from transcript",
|
||||
"backend_prompt_messages": [{"role": "user", "content": "delegate from transcript"}],
|
||||
});
|
||||
assert_eq!(
|
||||
realtime_text_from_conversation_item(&item),
|
||||
Some("delegate from transcript".to_string())
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -566,7 +566,7 @@ fn message_input_texts(body: &Value, role: &str) -> Vec<String> {
|
|||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_realtime_text_starts_turn_and_ignores_role() -> Result<()> {
|
||||
async fn inbound_realtime_text_starts_turn_for_assistant_role() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let api_server = start_mock_server().await;
|
||||
|
|
@ -589,7 +589,7 @@ async fn inbound_realtime_text_starts_turn_and_ignores_role() -> Result<()> {
|
|||
"type": "conversation.item.added",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "user",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": "text from realtime"}]
|
||||
}
|
||||
}),
|
||||
|
|
@ -633,6 +633,321 @@ async fn inbound_realtime_text_starts_turn_and_ignores_role() -> Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let api_server = start_mock_server().await;
|
||||
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![
|
||||
json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_ignore_user_role" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.item.added",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "user",
|
||||
"content": [{"type": "text", "text": "echoed local text"}]
|
||||
}
|
||||
}),
|
||||
json!({
|
||||
"type": "response.output_audio.delta",
|
||||
"delta": "AQID",
|
||||
"sample_rate": 24000,
|
||||
"num_channels": 1
|
||||
}),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build(&api_server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
}) if session_id == "sess_ignore_user_role" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let audio_out = tokio::time::timeout(
|
||||
Duration::from_millis(500),
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::AudioOut(frame),
|
||||
}) => Some(frame.clone()),
|
||||
_ => None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.expect("timed out waiting for realtime audio after user-role conversation item");
|
||||
assert_eq!(audio_out.data, "AQID");
|
||||
|
||||
let unexpected_turn_started = tokio::time::timeout(
|
||||
Duration::from_millis(200),
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::TurnStarted(_) => Some(()),
|
||||
_ => None,
|
||||
}),
|
||||
)
|
||||
.await;
|
||||
assert!(unexpected_turn_started.is_err());
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_audio() -> Result<()>
|
||||
{
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
||||
let first_chunks = vec![
|
||||
StreamingSseChunk {
|
||||
gate: None,
|
||||
body: sse_event(responses::ev_response_created("resp-1")),
|
||||
},
|
||||
StreamingSseChunk {
|
||||
gate: None,
|
||||
body: sse_event(responses::ev_assistant_message(
|
||||
"msg-1",
|
||||
"assistant says hi",
|
||||
)),
|
||||
},
|
||||
StreamingSseChunk {
|
||||
gate: Some(gate_completed_rx),
|
||||
body: sse_event(responses::ev_completed("resp-1")),
|
||||
},
|
||||
];
|
||||
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
|
||||
|
||||
let realtime_server = start_websocket_server(vec![vec![
|
||||
vec![
|
||||
json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_echo_guard" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.item.added",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": "delegate now"}]
|
||||
}
|
||||
}),
|
||||
],
|
||||
vec![
|
||||
json!({
|
||||
"type": "conversation.item.added",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "user",
|
||||
"content": [{"type": "text", "text": "assistant says hi"}]
|
||||
}
|
||||
}),
|
||||
json!({
|
||||
"type": "response.output_audio.delta",
|
||||
"delta": "AQID",
|
||||
"sample_rate": 24000,
|
||||
"num_channels": 1
|
||||
}),
|
||||
],
|
||||
]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_model("gpt-5.1").with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_streaming_server(&api_server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
}) if session_id == "sess_echo_guard" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::ConversationItemAdded(item),
|
||||
}) => item
|
||||
.get("content")
|
||||
.and_then(Value::as_array)
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.any(|content| content.get("text").and_then(Value::as_str) == Some("delegate now"))
|
||||
.then_some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let audio_out = tokio::time::timeout(
|
||||
Duration::from_millis(500),
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::AudioOut(frame),
|
||||
}) => Some(frame.clone()),
|
||||
_ => None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.expect("timed out waiting for realtime audio after echoed user-role message");
|
||||
assert_eq!(audio_out.data, "AQID");
|
||||
|
||||
let completion = completions
|
||||
.into_iter()
|
||||
.next()
|
||||
.expect("missing delegated turn completion");
|
||||
let _ = gate_completed_tx.send(());
|
||||
completion
|
||||
.await
|
||||
.expect("delegated turn request did not complete");
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
let requests = api_server.requests().await;
|
||||
assert_eq!(requests.len(), 1);
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
api_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_realtime_text_does_not_block_realtime_event_forwarding() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
||||
let first_chunks = vec![
|
||||
StreamingSseChunk {
|
||||
gate: None,
|
||||
body: sse_event(responses::ev_response_created("resp-1")),
|
||||
},
|
||||
StreamingSseChunk {
|
||||
gate: Some(gate_completed_rx),
|
||||
body: sse_event(responses::ev_completed("resp-1")),
|
||||
},
|
||||
];
|
||||
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
|
||||
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![
|
||||
json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_non_blocking" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.item.added",
|
||||
"item": {
|
||||
"type": "message",
|
||||
"role": "assistant",
|
||||
"content": [{"type": "text", "text": "delegate now"}]
|
||||
}
|
||||
}),
|
||||
json!({
|
||||
"type": "response.output_audio.delta",
|
||||
"delta": "AQID",
|
||||
"sample_rate": 24000,
|
||||
"num_channels": 1
|
||||
}),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_model("gpt-5.1").with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_streaming_server(&api_server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
}) if session_id == "sess_non_blocking" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::ConversationItemAdded(item),
|
||||
}) => item
|
||||
.get("content")
|
||||
.and_then(Value::as_array)
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.any(|content| content.get("text").and_then(Value::as_str) == Some("delegate now"))
|
||||
.then_some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let audio_out = tokio::time::timeout(
|
||||
Duration::from_millis(500),
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::AudioOut(frame),
|
||||
}) => Some(frame.clone()),
|
||||
_ => None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.expect("timed out waiting for realtime audio while delegated turn was still pending");
|
||||
assert_eq!(audio_out.data, "AQID");
|
||||
|
||||
let completion = completions
|
||||
.into_iter()
|
||||
.next()
|
||||
.expect("missing delegated turn completion");
|
||||
let _ = gate_completed_tx.send(());
|
||||
completion
|
||||
.await
|
||||
.expect("delegated turn request did not complete");
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
api_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_realtime_text_steers_active_turn() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
|
@ -791,3 +1106,121 @@ async fn inbound_realtime_text_steers_active_turn() -> Result<()> {
|
|||
api_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn inbound_spawn_transcript_starts_turn_and_does_not_block_realtime_audio() -> Result<()> {
|
||||
skip_if_no_network!(Ok(()));
|
||||
|
||||
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
||||
let first_chunks = vec![
|
||||
StreamingSseChunk {
|
||||
gate: None,
|
||||
body: sse_event(responses::ev_response_created("resp-1")),
|
||||
},
|
||||
StreamingSseChunk {
|
||||
gate: Some(gate_completed_rx),
|
||||
body: sse_event(responses::ev_completed("resp-1")),
|
||||
},
|
||||
];
|
||||
let (api_server, completions) = start_streaming_sse_server(vec![first_chunks]).await;
|
||||
|
||||
let delegated_text = "delegate from spawn transcript";
|
||||
let realtime_server = start_websocket_server(vec![vec![vec![
|
||||
json!({
|
||||
"type": "session.created",
|
||||
"session": { "id": "sess_spawn_transcript" }
|
||||
}),
|
||||
json!({
|
||||
"type": "conversation.item.added",
|
||||
"item": {
|
||||
"type": "spawn_transcript",
|
||||
"seq": 1,
|
||||
"full_user_transcript": delegated_text,
|
||||
"delta_user_transcript": delegated_text,
|
||||
"backend_prompt_messages": [{
|
||||
"role": "user",
|
||||
"channel": null,
|
||||
"content": delegated_text,
|
||||
"content_type": "text"
|
||||
}],
|
||||
"transcript_source": "backend_prompt_messages"
|
||||
}
|
||||
}),
|
||||
json!({
|
||||
"type": "response.output_audio.delta",
|
||||
"delta": "AQID",
|
||||
"sample_rate": 24000,
|
||||
"num_channels": 1
|
||||
}),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let mut builder = test_codex().with_model("gpt-5.1").with_config({
|
||||
let realtime_base_url = realtime_server.uri().to_string();
|
||||
move |config| {
|
||||
config.experimental_realtime_ws_base_url = Some(realtime_base_url);
|
||||
}
|
||||
});
|
||||
let test = builder.build_with_streaming_server(&api_server).await?;
|
||||
|
||||
test.codex
|
||||
.submit(Op::RealtimeConversationStart(ConversationStartParams {
|
||||
prompt: "backend prompt".to_string(),
|
||||
session_id: None,
|
||||
}))
|
||||
.await?;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::SessionCreated { session_id },
|
||||
}) if session_id == "sess_spawn_transcript" => Some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let _ = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::ConversationItemAdded(item),
|
||||
}) => (item.get("type").and_then(Value::as_str) == Some("spawn_transcript")
|
||||
&& item.get("delta_user_transcript").and_then(Value::as_str) == Some(delegated_text))
|
||||
.then_some(()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
|
||||
let audio_out = tokio::time::timeout(
|
||||
Duration::from_millis(500),
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::AudioOut(frame),
|
||||
}) => Some(frame.clone()),
|
||||
_ => None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.expect("timed out waiting for realtime audio after spawn_transcript");
|
||||
assert_eq!(audio_out.data, "AQID");
|
||||
|
||||
let completion = completions
|
||||
.into_iter()
|
||||
.next()
|
||||
.expect("missing delegated turn completion");
|
||||
let _ = gate_completed_tx.send(());
|
||||
completion
|
||||
.await
|
||||
.expect("delegated turn request did not complete");
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
.await;
|
||||
|
||||
let requests = api_server.requests().await;
|
||||
assert_eq!(requests.len(), 1);
|
||||
let first_body: Value = serde_json::from_slice(&requests[0]).expect("parse first request");
|
||||
let first_texts = message_input_texts(&first_body, "user");
|
||||
assert!(first_texts.iter().any(|text| text == delegated_text));
|
||||
|
||||
realtime_server.shutdown().await;
|
||||
api_server.shutdown().await;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue