2026-01-31 19:16:44 -08:00
|
|
|
use crate::error::ApiError;
|
2025-11-25 18:06:12 +00:00
|
|
|
use codex_client::Request;
|
|
|
|
|
use codex_client::RequestTelemetry;
|
|
|
|
|
use codex_client::Response;
|
|
|
|
|
use codex_client::RetryPolicy;
|
|
|
|
|
use codex_client::StreamResponse;
|
|
|
|
|
use codex_client::TransportError;
|
|
|
|
|
use codex_client::run_with_retry;
|
|
|
|
|
use http::StatusCode;
|
|
|
|
|
use std::future::Future;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use std::time::Duration;
|
|
|
|
|
use tokio::time::Instant;
|
2026-01-31 19:16:44 -08:00
|
|
|
use tokio_tungstenite::tungstenite::Error;
|
|
|
|
|
use tokio_tungstenite::tungstenite::Message;
|
2025-11-25 18:06:12 +00:00
|
|
|
|
|
|
|
|
/// Generic telemetry.
|
|
|
|
|
pub trait SseTelemetry: Send + Sync {
|
|
|
|
|
fn on_sse_poll(
|
|
|
|
|
&self,
|
|
|
|
|
result: &Result<
|
|
|
|
|
Option<
|
|
|
|
|
Result<
|
|
|
|
|
eventsource_stream::Event,
|
|
|
|
|
eventsource_stream::EventStreamError<TransportError>,
|
|
|
|
|
>,
|
|
|
|
|
>,
|
|
|
|
|
tokio::time::error::Elapsed,
|
|
|
|
|
>,
|
|
|
|
|
duration: Duration,
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2026-01-31 19:16:44 -08:00
|
|
|
/// Telemetry for Responses WebSocket transport.
|
|
|
|
|
pub trait WebsocketTelemetry: Send + Sync {
|
|
|
|
|
fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>);
|
|
|
|
|
|
|
|
|
|
fn on_ws_event(
|
|
|
|
|
&self,
|
|
|
|
|
result: &Result<Option<Result<Message, Error>>, ApiError>,
|
|
|
|
|
duration: Duration,
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
|
2025-11-25 18:06:12 +00:00
|
|
|
pub(crate) trait WithStatus {
|
|
|
|
|
fn status(&self) -> StatusCode;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fn http_status(err: &TransportError) -> Option<StatusCode> {
|
|
|
|
|
match err {
|
|
|
|
|
TransportError::Http { status, .. } => Some(*status),
|
|
|
|
|
_ => None,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl WithStatus for Response {
|
|
|
|
|
fn status(&self) -> StatusCode {
|
|
|
|
|
self.status
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl WithStatus for StreamResponse {
|
|
|
|
|
fn status(&self) -> StatusCode {
|
|
|
|
|
self.status
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub(crate) async fn run_with_request_telemetry<T, F, Fut>(
|
|
|
|
|
policy: RetryPolicy,
|
|
|
|
|
telemetry: Option<Arc<dyn RequestTelemetry>>,
|
|
|
|
|
make_request: impl FnMut() -> Request,
|
|
|
|
|
send: F,
|
|
|
|
|
) -> Result<T, TransportError>
|
|
|
|
|
where
|
|
|
|
|
T: WithStatus,
|
|
|
|
|
F: Clone + Fn(Request) -> Fut,
|
|
|
|
|
Fut: Future<Output = Result<T, TransportError>>,
|
|
|
|
|
{
|
|
|
|
|
// Wraps `run_with_retry` to attach per-attempt request telemetry for both
|
|
|
|
|
// unary and streaming HTTP calls.
|
|
|
|
|
run_with_retry(policy, make_request, move |req, attempt| {
|
|
|
|
|
let telemetry = telemetry.clone();
|
|
|
|
|
let send = send.clone();
|
|
|
|
|
async move {
|
|
|
|
|
let start = Instant::now();
|
|
|
|
|
let result = send(req).await;
|
|
|
|
|
if let Some(t) = telemetry.as_ref() {
|
|
|
|
|
let (status, err) = match &result {
|
|
|
|
|
Ok(resp) => (Some(resp.status()), None),
|
|
|
|
|
Err(err) => (http_status(err), Some(err)),
|
|
|
|
|
};
|
|
|
|
|
t.on_request(attempt, status, err, start.elapsed());
|
|
|
|
|
}
|
|
|
|
|
result
|
|
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
.await
|
|
|
|
|
}
|