Add realtime websocket tracing (#12981)

- add transport and conversation logs around connect, close, and parse
flow
- log realtime transport failures as errors for easier debugging
This commit is contained in:
Ahmed Ibrahim 2026-02-26 22:15:18 -08:00 committed by GitHub
parent 4d180ae428
commit 53e28f18cf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 62 additions and 3 deletions

View file

@ -25,6 +25,8 @@ use tokio_tungstenite::WebSocketStream;
use tokio_tungstenite::tungstenite::Error as WsError;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::tungstenite::client::IntoClientRequest;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::trace;
use tungstenite::protocol::WebSocketConfig;
@ -62,15 +64,23 @@ impl WsStream {
};
match command {
WsCommand::Send { message, tx_result } => {
debug!("realtime websocket sending message");
let result = inner.send(message).await;
let should_break = result.is_err();
if let Err(err) = &result {
error!("realtime websocket send failed: {err}");
}
let _ = tx_result.send(result);
if should_break {
break;
}
}
WsCommand::Close { tx_result } => {
info!("realtime websocket sending close");
let result = inner.close(None).await;
if let Err(err) = &result {
error!("realtime websocket close failed: {err}");
}
let _ = tx_result.send(result);
break;
}
@ -82,7 +92,9 @@ impl WsStream {
};
match message {
Ok(Message::Ping(payload)) => {
trace!(payload_len = payload.len(), "realtime websocket received ping");
if let Err(err) = inner.send(Message::Pong(payload)).await {
error!("realtime websocket failed to send pong: {err}");
let _ = tx_message.send(Err(err));
break;
}
@ -93,6 +105,24 @@ impl WsStream {
| Message::Close(_)
| Message::Frame(_))) => {
let is_close = matches!(message, Message::Close(_));
match &message {
Message::Text(_) => trace!("realtime websocket received text frame"),
Message::Binary(binary) => {
error!(
payload_len = binary.len(),
"realtime websocket received unexpected binary frame"
);
}
Message::Close(frame) => info!(
"realtime websocket received close frame: code={:?} reason={:?}",
frame.as_ref().map(|frame| frame.code),
frame.as_ref().map(|frame| frame.reason.as_str())
),
Message::Frame(_) => {
trace!("realtime websocket received raw frame");
}
Message::Ping(_) | Message::Pong(_) => {}
}
if tx_message.send(Ok(message)).is_err() {
break;
}
@ -101,6 +131,7 @@ impl WsStream {
}
}
Err(err) => {
error!("realtime websocket receive failed: {err}");
let _ = tx_message.send(Err(err));
break;
}
@ -108,6 +139,7 @@ impl WsStream {
}
}
}
info!("realtime websocket pump exiting");
});
(
@ -298,7 +330,7 @@ impl RealtimeWebsocketWriter {
async fn send_json(&self, message: RealtimeOutboundMessage) -> Result<(), ApiError> {
let payload = serde_json::to_string(&message)
.map_err(|err| ApiError::Stream(format!("failed to encode realtime request: {err}")))?;
trace!("realtime websocket request: {payload}");
debug!(?message, "realtime websocket request");
if self.is_closed.load(Ordering::SeqCst) {
return Err(ApiError::Stream(
@ -325,12 +357,14 @@ impl RealtimeWebsocketEvents {
Some(Ok(msg)) => msg,
Some(Err(err)) => {
self.is_closed.store(true, Ordering::SeqCst);
error!("realtime websocket read failed: {err}");
return Err(ApiError::Stream(format!(
"failed to read websocket message: {err}"
)));
}
None => {
self.is_closed.store(true, Ordering::SeqCst);
info!("realtime websocket event stream ended");
return Ok(None);
}
};
@ -338,11 +372,18 @@ impl RealtimeWebsocketEvents {
match msg {
Message::Text(text) => {
if let Some(event) = parse_realtime_event(&text) {
debug!(?event, "realtime websocket parsed event");
return Ok(Some(event));
}
debug!("realtime websocket ignored unsupported text frame");
}
Message::Close(_) => {
Message::Close(frame) => {
self.is_closed.store(true, Ordering::SeqCst);
info!(
"realtime websocket closed: code={:?} reason={:?}",
frame.as_ref().map(|frame| frame.code),
frame.as_ref().map(|frame| frame.reason.as_str())
);
return Ok(None);
}
Message::Binary(_) => {
@ -383,15 +424,24 @@ impl RealtimeWebsocketClient {
request.headers_mut().extend(headers);
info!("connecting realtime websocket: {ws_url}");
let (stream, _) =
let (stream, response) =
tokio_tungstenite::connect_async_with_config(request, Some(websocket_config()), false)
.await
.map_err(|err| {
ApiError::Stream(format!("failed to connect realtime websocket: {err}"))
})?;
info!(
ws_url = %ws_url,
status = %response.status(),
"realtime websocket connected"
);
let (stream, rx_message) = WsStream::new(stream);
let connection = RealtimeWebsocketConnection::new(stream, rx_message);
debug!(
conversation_id = config.session_id.as_deref().unwrap_or("<none>"),
"realtime websocket sending session.create"
);
connection
.send_session_create(config.prompt, config.session_id)
.await?;

View file

@ -31,6 +31,7 @@ use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tracing::debug;
use tracing::error;
use tracing::info;
use tracing::warn;
const AUDIO_IN_QUEUE_CAPACITY: usize = 256;
@ -184,6 +185,7 @@ pub(crate) async fn handle_start(
let requested_session_id = params
.session_id
.or_else(|| Some(sess.conversation_id.to_string()));
info!("starting realtime conversation");
let events_rx = match sess
.conversation
.start(api_provider, None, prompt, requested_session_id.clone())
@ -191,11 +193,14 @@ pub(crate) async fn handle_start(
{
Ok(events_rx) => events_rx,
Err(err) => {
error!("failed to start realtime conversation: {err}");
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::Other).await;
return Ok(());
}
};
info!("realtime conversation started");
sess.send_event_raw(Event {
id: sub_id.clone(),
msg: EventMsg::RealtimeConversationStarted(RealtimeConversationStartedEvent {
@ -211,6 +216,7 @@ pub(crate) async fn handle_start(
msg,
};
while let Ok(event) = events_rx.recv().await {
debug!(conversation_id = %sess_clone.conversation_id, "received realtime conversation event");
let maybe_routed_text = match &event {
RealtimeEvent::ConversationItemAdded(item) => {
realtime_text_from_conversation_item(item)
@ -231,6 +237,7 @@ pub(crate) async fn handle_start(
.await;
}
if let Some(()) = sess_clone.conversation.running_state().await {
info!("realtime conversation transport closed");
sess_clone
.send_event_raw(ev(EventMsg::RealtimeConversationClosed(
RealtimeConversationClosedEvent {
@ -250,6 +257,7 @@ pub(crate) async fn handle_audio(
params: ConversationAudioParams,
) {
if let Err(err) = sess.conversation.audio_in(params.frame).await {
error!("failed to append realtime audio: {err}");
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await;
}
}
@ -284,6 +292,7 @@ pub(crate) async fn handle_text(
debug!(text = %params.text, "[realtime-text] appending realtime conversation text input");
if let Err(err) = sess.conversation.text_in(params.text).await {
error!("failed to append realtime text: {err}");
send_conversation_error(sess, sub_id, err.to_string(), CodexErrorInfo::BadRequest).await;
}
}