diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index ca5075572..6a073d926 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -12,6 +12,7 @@ use futures::SinkExt; use futures::StreamExt; use serde_json::Value; use tokio::net::TcpListener; +use tokio::sync::Notify; use tokio::sync::oneshot; use tokio_tungstenite::accept_hdr_async_with_config; use tokio_tungstenite::tungstenite::Message; @@ -335,6 +336,7 @@ pub struct WebSocketTestServer { uri: String, connections: Arc>>>, handshakes: Arc>>, + request_log_updated: Arc, shutdown: oneshot::Sender<()>, task: tokio::task::JoinHandle<()>, } @@ -356,6 +358,26 @@ impl WebSocketTestServer { connections.first().cloned().unwrap_or_default() } + pub async fn wait_for_request( + &self, + connection_index: usize, + request_index: usize, + ) -> WebSocketRequest { + loop { + if let Some(request) = self + .connections + .lock() + .unwrap() + .get(connection_index) + .and_then(|connection| connection.get(request_index)) + .cloned() + { + return request; + } + self.request_log_updated.notified().await; + } + } + pub fn handshakes(&self) -> Vec { self.handshakes.lock().unwrap().clone() } @@ -1069,6 +1091,7 @@ pub async fn start_websocket_server(connections: Vec>>) -> WebSoc pub async fn start_websocket_server_with_headers( connections: Vec, ) -> WebSocketTestServer { + let start = std::time::Instant::now(); let listener = TcpListener::bind("127.0.0.1:0") .await .expect("bind websocket server"); @@ -1076,8 +1099,10 @@ pub async fn start_websocket_server_with_headers( let uri = format!("ws://{addr}"); let connections_log = Arc::new(Mutex::new(Vec::new())); let handshakes_log = Arc::new(Mutex::new(Vec::new())); + let request_log_updated = Arc::new(Notify::new()); let requests = Arc::clone(&connections_log); let handshakes = Arc::clone(&handshakes_log); + let request_log = Arc::clone(&request_log_updated); let connections = Arc::new(Mutex::new(VecDeque::from(connections))); let (shutdown_tx, mut shutdown_rx) = oneshot::channel(); @@ -1159,9 +1184,51 @@ pub async fn start_websocket_server_with_headers( let mut log = requests.lock().unwrap(); if let Some(connection_log) = log.get_mut(connection_index) { connection_log.push(WebSocketRequest { body }); + let request_index = connection_log.len() - 1; + let request = &connection_log[request_index]; + let request_body = request.body_json(); + eprintln!( + "[ws test server +{}ms] connection={} received request={} type={:?} role={:?} text={:?} data={:?}", + start.elapsed().as_millis(), + connection_index, + request_index, + request_body.get("type").and_then(Value::as_str), + request_body + .get("item") + .and_then(|item| item.get("role")) + .and_then(Value::as_str), + request_body + .get("item") + .and_then(|item| item.get("content")) + .and_then(Value::as_array) + .and_then(|content| content.first()) + .and_then(|content| content.get("text")) + .and_then(Value::as_str), + request_body + .get("item") + .and_then(|item| item.get("content")) + .and_then(Value::as_array) + .and_then(|content| content.first()) + .and_then(|content| content.get("data")) + .and_then(Value::as_str), + ); } + request_log.notify_waiters(); } + eprintln!( + "[ws test server +{}ms] connection={} sending batch_size={} event_types={:?} audio_data={:?}", + start.elapsed().as_millis(), + connection_index, + request_events.len(), + request_events + .iter() + .map(|event| event.get("type").and_then(Value::as_str)) + .collect::>(), + request_events + .iter() + .find_map(|event| event.get("delta").and_then(Value::as_str)), + ); for event in &request_events { let Ok(payload) = serde_json::to_string(event) else { continue; @@ -1184,6 +1251,7 @@ pub async fn start_websocket_server_with_headers( uri, connections: connections_log, handshakes: handshakes_log, + request_log_updated, shutdown: shutdown_tx, task, } diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 98848e911..1f7504d41 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -715,6 +715,7 @@ async fn inbound_realtime_text_ignores_user_role_and_still_forwards_audio() -> R async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_audio() -> Result<()> { skip_if_no_network!(Ok(())); + let start = std::time::Instant::now(); let (gate_completed_tx, gate_completed_rx) = oneshot::channel(); let first_chunks = vec![ @@ -806,18 +807,45 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au _ => None, }) .await; + eprintln!( + "[realtime test +{}ms] saw trigger text={:?}", + start.elapsed().as_millis(), + "delegate now" + ); - let audio_out = tokio::time::timeout( - Duration::from_millis(500), - wait_for_event_match(&test.codex, |msg| match msg { - EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { - payload: RealtimeEvent::AudioOut(frame), - }) => Some(frame.clone()), - _ => None, - }), - ) - .await - .expect("timed out waiting for realtime audio after echoed user-role message"); + let mirrored_request = realtime_server.wait_for_request(0, 1).await; + let mirrored_request_body = mirrored_request.body_json(); + eprintln!( + "[realtime test +{}ms] saw mirrored request type={:?} role={:?} text={:?} data={:?}", + start.elapsed().as_millis(), + mirrored_request_body["type"].as_str(), + mirrored_request_body["item"]["role"].as_str(), + mirrored_request_body["item"]["content"][0]["text"].as_str(), + mirrored_request_body["item"]["content"][0]["data"].as_str(), + ); + assert_eq!( + mirrored_request_body["type"].as_str(), + Some("conversation.item.create") + ); + assert_eq!( + mirrored_request_body["item"]["content"][0]["text"].as_str(), + Some("assistant says hi") + ); + + let audio_out = wait_for_event_match(&test.codex, |msg| match msg { + EventMsg::RealtimeConversationRealtime(RealtimeConversationRealtimeEvent { + payload: RealtimeEvent::AudioOut(frame), + }) => Some(frame.clone()), + _ => None, + }) + .await; + eprintln!( + "[realtime test +{}ms] saw audio out data={} sample_rate={} num_channels={}", + start.elapsed().as_millis(), + audio_out.data, + audio_out.sample_rate, + audio_out.num_channels + ); assert_eq!(audio_out.data, "AQID"); let completion = completions @@ -828,6 +856,10 @@ async fn delegated_turn_user_role_echo_does_not_redelegate_and_still_forwards_au completion .await .expect("delegated turn request did not complete"); + eprintln!( + "[realtime test +{}ms] delegated completion resolved", + start.elapsed().as_millis() + ); wait_for_event(&test.codex, |event| { matches!(event, EventMsg::TurnComplete(_)) })