Make realtime audio test deterministic (#12959)
## Summary\n- add a websocket test-server request waiter so tests can synchronize on recorded client messages\n- use that waiter in the realtime delegation test instead of a fixed audio timeout\n- add temporary timing logs in the test and websocket mock to inspect where the flake stalls
This commit is contained in:
parent
90cc4e79a2
commit
a11da86b37
2 changed files with 111 additions and 11 deletions
|
|
@ -12,6 +12,7 @@ use futures::SinkExt;
|
|||
use futures::StreamExt;
|
||||
use serde_json::Value;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Notify;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_tungstenite::accept_hdr_async_with_config;
|
||||
use tokio_tungstenite::tungstenite::Message;
|
||||
|
|
@ -335,6 +336,7 @@ pub struct WebSocketTestServer {
|
|||
uri: String,
|
||||
connections: Arc<Mutex<Vec<Vec<WebSocketRequest>>>>,
|
||||
handshakes: Arc<Mutex<Vec<WebSocketHandshake>>>,
|
||||
request_log_updated: Arc<Notify>,
|
||||
shutdown: oneshot::Sender<()>,
|
||||
task: tokio::task::JoinHandle<()>,
|
||||
}
|
||||
|
|
@ -356,6 +358,26 @@ impl WebSocketTestServer {
|
|||
connections.first().cloned().unwrap_or_default()
|
||||
}
|
||||
|
||||
pub async fn wait_for_request(
|
||||
&self,
|
||||
connection_index: usize,
|
||||
request_index: usize,
|
||||
) -> WebSocketRequest {
|
||||
loop {
|
||||
if let Some(request) = self
|
||||
.connections
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get(connection_index)
|
||||
.and_then(|connection| connection.get(request_index))
|
||||
.cloned()
|
||||
{
|
||||
return request;
|
||||
}
|
||||
self.request_log_updated.notified().await;
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handshakes(&self) -> Vec<WebSocketHandshake> {
|
||||
self.handshakes.lock().unwrap().clone()
|
||||
}
|
||||
|
|
@ -1069,6 +1091,7 @@ pub async fn start_websocket_server(connections: Vec<Vec<Vec<Value>>>) -> WebSoc
|
|||
pub async fn start_websocket_server_with_headers(
|
||||
connections: Vec<WebSocketConnectionConfig>,
|
||||
) -> WebSocketTestServer {
|
||||
let start = std::time::Instant::now();
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.expect("bind websocket server");
|
||||
|
|
@ -1076,8 +1099,10 @@ pub async fn start_websocket_server_with_headers(
|
|||
let uri = format!("ws://{addr}");
|
||||
let connections_log = Arc::new(Mutex::new(Vec::new()));
|
||||
let handshakes_log = Arc::new(Mutex::new(Vec::new()));
|
||||
let request_log_updated = Arc::new(Notify::new());
|
||||
let requests = Arc::clone(&connections_log);
|
||||
let handshakes = Arc::clone(&handshakes_log);
|
||||
let request_log = Arc::clone(&request_log_updated);
|
||||
let connections = Arc::new(Mutex::new(VecDeque::from(connections)));
|
||||
let (shutdown_tx, mut shutdown_rx) = oneshot::channel();
|
||||
|
||||
|
|
@ -1159,9 +1184,51 @@ pub async fn start_websocket_server_with_headers(
|
|||
let mut log = requests.lock().unwrap();
|
||||
if let Some(connection_log) = log.get_mut(connection_index) {
|
||||
connection_log.push(WebSocketRequest { body });
|
||||
let request_index = connection_log.len() - 1;
|
||||
let request = &connection_log[request_index];
|
||||
let request_body = request.body_json();
|
||||
eprintln!(
|
||||
"[ws test server +{}ms] connection={} received request={} type={:?} role={:?} text={:?} data={:?}",
|
||||
start.elapsed().as_millis(),
|
||||
connection_index,
|
||||
request_index,
|
||||
request_body.get("type").and_then(Value::as_str),
|
||||
request_body
|
||||
.get("item")
|
||||
.and_then(|item| item.get("role"))
|
||||
.and_then(Value::as_str),
|
||||
request_body
|
||||
.get("item")
|
||||
.and_then(|item| item.get("content"))
|
||||
.and_then(Value::as_array)
|
||||
.and_then(|content| content.first())
|
||||
.and_then(|content| content.get("text"))
|
||||
.and_then(Value::as_str),
|
||||
request_body
|
||||
.get("item")
|
||||
.and_then(|item| item.get("content"))
|
||||
.and_then(Value::as_array)
|
||||
.and_then(|content| content.first())
|
||||
.and_then(|content| content.get("data"))
|
||||
.and_then(Value::as_str),
|
||||
);
|
||||
}
|
||||
request_log.notify_waiters();
|
||||
}
|
||||
|
||||
eprintln!(
|
||||
"[ws test server +{}ms] connection={} sending batch_size={} event_types={:?} audio_data={:?}",
|
||||
start.elapsed().as_millis(),
|
||||
connection_index,
|
||||
request_events.len(),
|
||||
request_events
|
||||
.iter()
|
||||
.map(|event| event.get("type").and_then(Value::as_str))
|
||||
.collect::<Vec<_>>(),
|
||||
request_events
|
||||
.iter()
|
||||
.find_map(|event| event.get("delta").and_then(Value::as_str)),
|
||||
);
|
||||
for event in &request_events {
|
||||
let Ok(payload) = serde_json::to_string(event) else {
|
||||
continue;
|
||||
|
|
@ -1184,6 +1251,7 @@ pub async fn start_websocket_server_with_headers(
|
|||
uri,
|
||||
connections: connections_log,
|
||||
handshakes: handshakes_log,
|
||||
request_log_updated,
|
||||
shutdown: shutdown_tx,
|
||||
task,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -715,6 +715,7 @@ async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> R
|
|||
async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_audio() -> Result<()>
|
||||
{
|
||||
skip_if_no_network!(Ok(()));
|
||||
let start = std::time::Instant::now();
|
||||
|
||||
let (gate_completed_tx, gate_completed_rx) = oneshot::channel();
|
||||
let first_chunks = vec![
|
||||
|
|
@ -806,18 +807,45 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
|||
_ => None,
|
||||
})
|
||||
.await;
|
||||
eprintln!(
|
||||
"[realtime test +{}ms] saw trigger text={:?}",
|
||||
start.elapsed().as_millis(),
|
||||
"delegate now"
|
||||
);
|
||||
|
||||
let audio_out = tokio::time::timeout(
|
||||
Duration::from_millis(500),
|
||||
wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::AudioOut(frame),
|
||||
}) => Some(frame.clone()),
|
||||
_ => None,
|
||||
}),
|
||||
)
|
||||
.await
|
||||
.expect("timed out waiting for realtime audio after echoed user-role message");
|
||||
let mirrored_request = realtime_server.wait_for_request(0, 1).await;
|
||||
let mirrored_request_body = mirrored_request.body_json();
|
||||
eprintln!(
|
||||
"[realtime test +{}ms] saw mirrored request type={:?} role={:?} text={:?} data={:?}",
|
||||
start.elapsed().as_millis(),
|
||||
mirrored_request_body["type"].as_str(),
|
||||
mirrored_request_body["item"]["role"].as_str(),
|
||||
mirrored_request_body["item"]["content"][0]["text"].as_str(),
|
||||
mirrored_request_body["item"]["content"][0]["data"].as_str(),
|
||||
);
|
||||
assert_eq!(
|
||||
mirrored_request_body["type"].as_str(),
|
||||
Some("conversation.item.create")
|
||||
);
|
||||
assert_eq!(
|
||||
mirrored_request_body["item"]["content"][0]["text"].as_str(),
|
||||
Some("assistant says hi")
|
||||
);
|
||||
|
||||
let audio_out = wait_for_event_match(&test.codex, |msg| match msg {
|
||||
EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent {
|
||||
payload: RealtimeEvent::AudioOut(frame),
|
||||
}) => Some(frame.clone()),
|
||||
_ => None,
|
||||
})
|
||||
.await;
|
||||
eprintln!(
|
||||
"[realtime test +{}ms] saw audio out data={} sample_rate={} num_channels={}",
|
||||
start.elapsed().as_millis(),
|
||||
audio_out.data,
|
||||
audio_out.sample_rate,
|
||||
audio_out.num_channels
|
||||
);
|
||||
assert_eq!(audio_out.data, "AQID");
|
||||
|
||||
let completion = completions
|
||||
|
|
@ -828,6 +856,10 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au
|
|||
completion
|
||||
.await
|
||||
.expect("delegated turn request did not complete");
|
||||
eprintln!(
|
||||
"[realtime test +{}ms] delegated completion resolved",
|
||||
start.elapsed().as_millis()
|
||||
);
|
||||
wait_for_event(&test.codex, |event| {
|
||||
matches!(event, EventMsg::TurnComplete(_))
|
||||
})
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue