core-agent-ide/codex-rs/codex-api/src/telemetry.rs
Anton Panasenko 101d359cd7
Add websocket telemetry metrics and labels (#10316)
Summary
- expose websocket telemetry hooks through the responses client so
request durations and event processing can be reported
- record websocket request/event metrics and emit runtime telemetry
events that the history UI now surfaces
- improve tests to cover websocket telemetry reporting and guard runtime
summary updates


<img width="824" height="79" alt="Screenshot 2026-01-31 at 5 28 12 PM"
src="https://github.com/user-attachments/assets/ea9a7965-d8b4-4e3c-a984-ef4fdc44c81d"
/>
2026-01-31 19:16:44 -08:00

98 lines
2.6 KiB
Rust

use crate::error::ApiError;
use codex_client::Request;
use codex_client::RequestTelemetry;
use codex_client::Response;
use codex_client::RetryPolicy;
use codex_client::StreamResponse;
use codex_client::TransportError;
use codex_client::run_with_retry;
use http::StatusCode;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use tokio_tungstenite::tungstenite::Error;
use tokio_tungstenite::tungstenite::Message;
/// Generic telemetry.
pub trait SseTelemetry: Send + Sync {
fn on_sse_poll(
&self,
result: &Result<
Option<
Result<
eventsource_stream::Event,
eventsource_stream::EventStreamError<TransportError>,
>,
>,
tokio::time::error::Elapsed,
>,
duration: Duration,
);
}
/// Telemetry for Responses WebSocket transport.
pub trait WebsocketTelemetry: Send + Sync {
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>);
fn on_ws_event(
&self,
result: &Result<Option<Result<Message, Error>>, ApiError>,
duration: Duration,
);
}
pub(crate) trait WithStatus {
fn status(&self) -> StatusCode;
}
fn http_status(err: &TransportError) -> Option<StatusCode> {
match err {
TransportError::Http { status, .. } => Some(*status),
_ => None,
}
}
impl WithStatus for Response {
fn status(&self) -> StatusCode {
self.status
}
}
impl WithStatus for StreamResponse {
fn status(&self) -> StatusCode {
self.status
}
}
pub(crate) async fn run_with_request_telemetry<T, F, Fut>(
policy: RetryPolicy,
telemetry: Option<Arc<dyn RequestTelemetry>>,
make_request: impl FnMut() -> Request,
send: F,
) -> Result<T, TransportError>
where
T: WithStatus,
F: Clone + Fn(Request) -> Fut,
Fut: Future<Output = Result<T, TransportError>>,
{
// Wraps `run_with_retry` to attach per-attempt request telemetry for both
// unary and streaming HTTP calls.
run_with_retry(policy, make_request, move |req, attempt| {
let telemetry = telemetry.clone();
let send = send.clone();
async move {
let start = Instant::now();
let result = send(req).await;
if let Some(t) = telemetry.as_ref() {
let (status, err) = match &result {
Ok(resp) => (Some(resp.status()), None),
Err(err) => (http_status(err), Some(err)),
};
t.on_request(attempt, status, err, start.elapsed());
}
result
}
})
.await
}