diff --git a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs index 48cc1d700..97d0b9e32 100644 --- a/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs +++ b/codex-rs/codex-api/src/endpoint/realtime_websocket/methods.rs @@ -25,6 +25,8 @@ use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::tungstenite::Error as WsError; use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::client::IntoClientRequest; +use tracing::debug; +use tracing::error; use tracing::info; use tracing::trace; use tungstenite::protocol::WebSocketConfig; @@ -62,15 +64,23 @@ impl WsStream { }; match command { WsCommand::Send { message, tx_result } => { + debug!("realtime websocket sending message"); let result = inner.send(message).await; let should_break = result.is_err(); + if let Err(err) = &result { + error!("realtime websocket send failed: {err}"); + } let _ = tx_result.send(result); if should_break { break; } } WsCommand::Close { tx_result } => { + info!("realtime websocket sending close"); let result = inner.close(None).await; + if let Err(err) = &result { + error!("realtime websocket close failed: {err}"); + } let _ = tx_result.send(result); break; } @@ -82,7 +92,9 @@ impl WsStream { }; match message { Ok(Message::Ping(payload)) => { + trace!(payload_len = payload.len(), "realtime websocket received ping"); if let Err(err) = inner.send(Message::Pong(payload)).await { + error!("realtime websocket failed to send pong: {err}"); let _ = tx_message.send(Err(err)); break; } @@ -93,6 +105,24 @@ impl WsStream { | Message::Close(_) | Message::Frame(_))) => { let is_close = matches!(message, Message::Close(_)); + match &message { + Message::Text(_) => trace!("realtime websocket received text frame"), + Message::Binary(binary) => { + error!( + payload_len = binary.len(), + "realtime websocket received unexpected binary frame" + ); + } + Message::Close(frame) => info!( + "realtime websocket received close frame: code={:?} reason={:?}", + frame.as_ref().map(|frame| frame.code), + frame.as_ref().map(|frame| frame.reason.as_str()) + ), + Message::Frame(_) => { + trace!("realtime websocket received raw frame"); + } + Message::Ping(_) | Message::Pong(_) => {} + } if tx_message.send(Ok(message)).is_err() { break; } @@ -101,6 +131,7 @@ impl WsStream { } } Err(err) => { + error!("realtime websocket receive failed: {err}"); let _ = tx_message.send(Err(err)); break; } @@ -108,6 +139,7 @@ impl WsStream { } } } + info!("realtime websocket pump exiting"); }); ( @@ -298,7 +330,7 @@ impl RealtimeWebsocketWriter { async fn send_json(&self, message: RealtimeOutboundMessage) -> Result<(), ApiError> { let payload = serde_json::to_string(&message) .map_err(|err| ApiError::Stream(format!("failed to encode realtime request: {err}")))?; - trace!("realtime websocket request: {payload}"); + debug!(?message, "realtime websocket request"); if self.is_closed.load(Ordering::SeqCst) { return Err(ApiError::Stream( @@ -325,12 +357,14 @@ impl RealtimeWebsocketEvents { Some(Ok(msg)) => msg, Some(Err(err)) => { self.is_closed.store(true, Ordering::SeqCst); + error!("realtime websocket read failed: {err}"); return Err(ApiError::Stream(format!( "failed to read websocket message: {err}" ))); } None => { self.is_closed.store(true, Ordering::SeqCst); + info!("realtime websocket event stream ended"); return Ok(None); } }; @@ -338,11 +372,18 @@ impl RealtimeWebsocketEvents { match msg { Message::Text(text) => { if let Some(event) = parse_realtime_event(&text) { + debug!(?event, "realtime websocket parsed event"); return Ok(Some(event)); } + debug!("realtime websocket ignored unsupported text frame"); } - Message::Close(_) => { + Message::Close(frame) => { self.is_closed.store(true, Ordering::SeqCst); + info!( + "realtime websocket closed: code={:?} reason={:?}", + frame.as_ref().map(|frame| frame.code), + frame.as_ref().map(|frame| frame.reason.as_str()) + ); return Ok(None); } Message::Binary(_) => { @@ -383,15 +424,24 @@ impl RealtimeWebsocketClient { request.headers_mut().extend(headers); info!("connecting realtime websocket: {ws_url}"); - let (stream, _) = + let (stream, response) = tokio_tungstenite::connect_async_with_config(request, Some(websocket_config()), false) .await .map_err(|err| { ApiError::Stream(format!("failed to connect realtime websocket: {err}")) })?; + info!( + ws_url = %ws_url, + status = %response.status(), + "realtime websocket connected" + ); let (stream, rx_message) = WsStream::new(stream); let connection = RealtimeWebsocketConnection::new(stream, rx_message); + debug!( + conversation_id = config.session_id.as_deref().unwrap_or(""), + "realtime websocket sending session.create" + ); connection .send_session_create(config.prompt, config.session_id) .await?; diff --git a/codex-rs/core/src/realtime_conversation.rs b/codex-rs/core/src/realtime_conversation.rs index 1b858aedc..d643d7468 100644 --- a/codex-rs/core/src/realtime_conversation.rs +++ b/codex-rs/core/src/realtime_conversation.rs @@ -31,6 +31,7 @@ use tokio::sync::Mutex; use tokio::task::JoinHandle; use tracing::debug; use tracing::error; +use tracing::info; use tracing::warn; const AUDIO_IN_QUEUE_CAPACITY: usize = 256; @@ -184,6 +185,7 @@ pub(crate) async fn handle_start( let requested_session_id = params .session_id .or_else(|| Some(sess.conversation_id.to_string())); + info!("starting realtime conversation"); let events_rx = match sess .conversation .start(api_provider, None, prompt, requested_session_id.clone()) @@ -191,11 +193,14 @@ pub(crate) async fn handle_start( { Ok(events_rx) => events_rx, Err(err) => { + error!("failed to start realtime conversation: {err}"); send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await; return Ok(()); } }; + info!("realtime conversation started"); + sess.send_event_raw(Event { id: sub_id.clone(), msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent { @@ -211,6 +216,7 @@ pub(crate) async fn handle_start( msg, }; while let Ok(event) = events_rx.recv().await { + debug!(conversation_id = %sess_clone.conversation_id, "received realtime conversation event"); let maybe_routed_text = match &event { RealtimeEvent::ConversationItemAdded(item) => { realtime_text_from_conversation_item(item) @@ -231,6 +237,7 @@ pub(crate) async fn handle_start( .await; } if let Some(()) = sess_clone.conversation.running_state().await { + info!("realtime conversation transport closed"); sess_clone .send_event_raw(ev(EventMsg::RealtimeConversationClosed( RealtimeConversationClosedEvent { @@ -250,6 +257,7 @@ pub(crate) async fn handle_audio( params: ConversationAudioParams, ) { if let Err(err) = sess.conversation.audio_in(params.frame).await { + error!("failed to append realtime audio: {err}"); send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await; } } @@ -284,6 +292,7 @@ pub(crate) async fn handle_text( debug!(text = %params.text, "[realtime-text] appending realtime conversation text input"); if let Err(err) = sess.conversation.text_in(params.text).await { + error!("failed to append realtime text: {err}"); send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await; } }