From fcaed4cb88f068ffaa3cb9cfc69e7cf6605de0ce Mon Sep 17 00:00:00 2001 From: Anton Panasenko Date: Tue, 3 Feb 2026 18:04:07 -0800 Subject: [PATCH] feat: log webscocket timing into runtime metrics (#10577) --- codex-rs/core/src/client.rs | 8 ++ codex-rs/core/src/lib.rs | 1 + .../core/tests/suite/client_websockets.rs | 77 +++++++++++++++++++ codex-rs/otel/src/metrics/names.rs | 4 + codex-rs/otel/src/metrics/runtime_metrics.rs | 12 +++ codex-rs/otel/src/traces/otel_manager.rs | 38 +++++++++ codex-rs/otel/tests/suite/runtime_summary.rs | 14 +++- codex-rs/tui/src/history_cell.rs | 14 +++- 8 files changed, 165 insertions(+), 3 deletions(-) diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index fcc308f38..314bfe7b4 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -72,6 +72,8 @@ use crate::transport_manager::TransportManager; pub const WEB_SEARCH_ELIGIBLE_HEADER: &str = "x-oai-web-search-eligible"; pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state"; pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata"; +pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str = + "x-responsesapi-include-timing-metrics"; #[derive(Debug, Default)] struct TurnMetadataCache { @@ -489,6 +491,12 @@ impl ModelClientSession { if needs_new { let mut headers = options.extra_headers.clone(); headers.extend(build_conversation_headers(options.conversation_id.clone())); + if self.state.config.features.enabled(Feature::RuntimeMetrics) { + headers.insert( + X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER, + HeaderValue::from_static("true"), + ); + } let websocket_telemetry = self.build_websocket_telemetry(); let new_conn: ApiWebSocketConnection = ApiWebSocketResponsesClient::new(api_provider, api_auth) diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index c9d67e19f..c5ac0d8f8 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -61,6 +61,7 @@ pub mod token_data; mod truncate; mod unified_exec; pub mod windows_sandbox; +pub use client::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER; pub use model_provider_info::DEFAULT_LMSTUDIO_PORT; pub use model_provider_info::DEFAULT_OLLAMA_PORT; pub use model_provider_info::LMSTUDIO_OSS_PROVIDER_ID; diff --git a/codex-rs/core/tests/suite/client_websockets.rs b/codex-rs/core/tests/suite/client_websockets.rs index 51c94edd7..9f6628344 100644 --- a/codex-rs/core/tests/suite/client_websockets.rs +++ b/codex-rs/core/tests/suite/client_websockets.rs @@ -10,6 +10,7 @@ use codex_core::ResponseEvent; use codex_core::ResponseItem; use codex_core::TransportManager; use codex_core::WireApi; +use codex_core::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER; use codex_core::features::Feature; use codex_core::models_manager::manager::ModelsManager; use codex_core::protocol::SessionSource; @@ -102,6 +103,72 @@ async fn responses_websocket_emits_websocket_telemetry_events() { server.shutdown().await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn responses_websocket_includes_timing_metrics_header_when_runtime_metrics_enabled() { + skip_if_no_network!(); + + let server = start_websocket_server(vec![vec![vec![ + ev_response_created("resp-1"), + serde_json::json!({ + "type": "responsesapi.websocket_timing", + "timing_metrics": { + "responses_duration_excl_engine_and_client_tool_time_ms": 120, + "engine_service_total_ms": 450 + } + }), + ev_completed("resp-1"), + ]]]) + .await; + + let harness = websocket_harness_with_runtime_metrics(&server, true).await; + harness.otel_manager.reset_runtime_metrics(); + let mut session = harness.client.new_session(None); + let prompt = prompt_with_input(vec![message_item("hello")]); + + stream_until_complete(&mut session, &prompt).await; + tokio::time::sleep(Duration::from_millis(10)).await; + + let handshake = server.single_handshake(); + assert_eq!( + handshake.header(X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER), + Some("true".to_string()) + ); + + let summary = harness + .otel_manager + .runtime_metrics_summary() + .expect("runtime metrics summary"); + assert_eq!(summary.responses_api_overhead_ms, 120); + assert_eq!(summary.responses_api_inference_time_ms, 450); + + server.shutdown().await; +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn responses_websocket_omits_timing_metrics_header_when_runtime_metrics_disabled() { + skip_if_no_network!(); + + let server = start_websocket_server(vec![vec![vec![ + ev_response_created("resp-1"), + ev_completed("resp-1"), + ]]]) + .await; + + let harness = websocket_harness_with_runtime_metrics(&server, false).await; + let mut session = harness.client.new_session(None); + let prompt = prompt_with_input(vec![message_item("hello")]); + + stream_until_complete(&mut session, &prompt).await; + + let handshake = server.single_handshake(); + assert_eq!( + handshake.header(X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER), + None + ); + + server.shutdown().await; +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn responses_websocket_emits_reasoning_included_event() { skip_if_no_network!(); @@ -241,11 +308,21 @@ fn websocket_provider(server: &WebSocketTestServer) -> ModelProviderInfo { } async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness { + websocket_harness_with_runtime_metrics(server, false).await +} + +async fn websocket_harness_with_runtime_metrics( + server: &WebSocketTestServer, + runtime_metrics_enabled: bool, +) -> WebsocketTestHarness { let provider = websocket_provider(server); let codex_home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&codex_home).await; config.model = Some(MODEL.to_string()); config.features.enable(Feature::ResponsesWebsockets); + if runtime_metrics_enabled { + config.features.enable(Feature::RuntimeMetrics); + } let config = Arc::new(config); let model_info = ModelsManager::construct_model_info_offline(MODEL, &config); let conversation_id = ThreadId::new(); diff --git a/codex-rs/otel/src/metrics/names.rs b/codex-rs/otel/src/metrics/names.rs index b8ff9364a..97c4f4e6e 100644 --- a/codex-rs/otel/src/metrics/names.rs +++ b/codex-rs/otel/src/metrics/names.rs @@ -8,3 +8,7 @@ pub(crate) const WEBSOCKET_REQUEST_COUNT_METRIC: &str = "codex.websocket.request pub(crate) const WEBSOCKET_REQUEST_DURATION_METRIC: &str = "codex.websocket.request.duration_ms"; pub(crate) const WEBSOCKET_EVENT_COUNT_METRIC: &str = "codex.websocket.event"; pub(crate) const WEBSOCKET_EVENT_DURATION_METRIC: &str = "codex.websocket.event.duration_ms"; +pub(crate) const RESPONSES_API_OVERHEAD_DURATION_METRIC: &str = + "codex.responses_api_overhead.duration_ms"; +pub(crate) const RESPONSES_API_INFERENCE_TIME_DURATION_METRIC: &str = + "codex.responses_api_inference_time.duration_ms"; diff --git a/codex-rs/otel/src/metrics/runtime_metrics.rs b/codex-rs/otel/src/metrics/runtime_metrics.rs index dbd28010f..d59ff4b1b 100644 --- a/codex-rs/otel/src/metrics/runtime_metrics.rs +++ b/codex-rs/otel/src/metrics/runtime_metrics.rs @@ -1,5 +1,7 @@ use crate::metrics::names::API_CALL_COUNT_METRIC; use crate::metrics::names::API_CALL_DURATION_METRIC; +use crate::metrics::names::RESPONSES_API_INFERENCE_TIME_DURATION_METRIC; +use crate::metrics::names::RESPONSES_API_OVERHEAD_DURATION_METRIC; use crate::metrics::names::SSE_EVENT_COUNT_METRIC; use crate::metrics::names::SSE_EVENT_DURATION_METRIC; use crate::metrics::names::TOOL_CALL_COUNT_METRIC; @@ -32,6 +34,8 @@ pub struct RuntimeMetricsSummary { pub streaming_events: RuntimeMetricTotals, pub websocket_calls: RuntimeMetricTotals, pub websocket_events: RuntimeMetricTotals, + pub responses_api_overhead_ms: u64, + pub responses_api_inference_time_ms: u64, } impl RuntimeMetricsSummary { @@ -41,6 +45,8 @@ impl RuntimeMetricsSummary { && self.streaming_events.is_empty() && self.websocket_calls.is_empty() && self.websocket_events.is_empty() + && self.responses_api_overhead_ms == 0 + && self.responses_api_inference_time_ms == 0 } pub(crate) fn from_snapshot(snapshot: &ResourceMetrics) -> Self { @@ -64,12 +70,18 @@ impl RuntimeMetricsSummary { count: sum_counter(snapshot, WEBSOCKET_EVENT_COUNT_METRIC), duration_ms: sum_histogram_ms(snapshot, WEBSOCKET_EVENT_DURATION_METRIC), }; + let responses_api_overhead_ms = + sum_histogram_ms(snapshot, RESPONSES_API_OVERHEAD_DURATION_METRIC); + let responses_api_inference_time_ms = + sum_histogram_ms(snapshot, RESPONSES_API_INFERENCE_TIME_DURATION_METRIC); Self { tool_calls, api_calls, streaming_events, websocket_calls, websocket_events, + responses_api_overhead_ms, + responses_api_inference_time_ms, } } } diff --git a/codex-rs/otel/src/traces/otel_manager.rs b/codex-rs/otel/src/traces/otel_manager.rs index c585ec67d..bde9515c2 100644 --- a/codex-rs/otel/src/traces/otel_manager.rs +++ b/codex-rs/otel/src/traces/otel_manager.rs @@ -1,5 +1,7 @@ use crate::metrics::names::API_CALL_COUNT_METRIC; use crate::metrics::names::API_CALL_DURATION_METRIC; +use crate::metrics::names::RESPONSES_API_INFERENCE_TIME_DURATION_METRIC; +use crate::metrics::names::RESPONSES_API_OVERHEAD_DURATION_METRIC; use crate::metrics::names::SSE_EVENT_COUNT_METRIC; use crate::metrics::names::SSE_EVENT_DURATION_METRIC; use crate::metrics::names::TOOL_CALL_COUNT_METRIC; @@ -42,6 +44,10 @@ pub use crate::ToolDecisionSource; const SSE_UNKNOWN_KIND: &str = "unknown"; const WEBSOCKET_UNKNOWN_KIND: &str = "unknown"; +const RESPONSES_WEBSOCKET_TIMING_KIND: &str = "responsesapi.websocket_timing"; +const RESPONSES_WEBSOCKET_TIMING_METRICS_FIELD: &str = "timing_metrics"; +const RESPONSES_API_OVERHEAD_FIELD: &str = "responses_duration_excl_engine_and_client_tool_time_ms"; +const RESPONSES_API_INFERENCE_FIELD: &str = "engine_service_total_ms"; impl OtelManager { #[allow(clippy::too_many_arguments)] @@ -252,6 +258,9 @@ impl OtelManager { .get("type") .and_then(|value| value.as_str()) .map(std::string::ToString::to_string); + if kind.as_deref() == Some(RESPONSES_WEBSOCKET_TIMING_KIND) { + self.record_responses_websocket_timing_metrics(&value); + } if kind.as_deref() == Some("response.failed") { success = false; error_message = value @@ -651,6 +660,22 @@ impl OtelManager { ); } + fn record_responses_websocket_timing_metrics(&self, value: &serde_json::Value) { + let timing_metrics = value.get(RESPONSES_WEBSOCKET_TIMING_METRICS_FIELD); + + let overhead_value = + timing_metrics.and_then(|value| value.get(RESPONSES_API_OVERHEAD_FIELD)); + if let Some(duration) = duration_from_ms_value(overhead_value) { + self.record_duration(RESPONSES_API_OVERHEAD_DURATION_METRIC, duration, &[]); + } + + let inference_value = + timing_metrics.and_then(|value| value.get(RESPONSES_API_INFERENCE_FIELD)); + if let Some(duration) = duration_from_ms_value(inference_value) { + self.record_duration(RESPONSES_API_INFERENCE_TIME_DURATION_METRIC, duration, &[]); + } + } + fn responses_type(event: &ResponseEvent) -> String { match event { ResponseEvent::Created => "created".into(), @@ -689,3 +714,16 @@ impl OtelManager { fn timestamp() -> String { Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true) } + +fn duration_from_ms_value(value: Option<&serde_json::Value>) -> Option { + let value = value?; + let ms = value + .as_f64() + .or_else(|| value.as_i64().map(|v| v as f64)) + .or_else(|| value.as_u64().map(|v| v as f64))?; + if !ms.is_finite() || ms < 0.0 { + return None; + } + let clamped = ms.min(u64::MAX as f64); + Some(Duration::from_millis(clamped.round() as u64)) +} diff --git a/codex-rs/otel/tests/suite/runtime_summary.rs b/codex-rs/otel/tests/suite/runtime_summary.rs index 78aed8a0a..811c801fc 100644 --- a/codex-rs/otel/tests/suite/runtime_summary.rs +++ b/codex-rs/otel/tests/suite/runtime_summary.rs @@ -62,6 +62,14 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<( r#"{"type":"response.created"}"#.into(), )))); manager.record_websocket_event(&ws_response, Duration::from_millis(80)); + let ws_timing_response: std::result::Result< + Option>, + codex_api::ApiError, + > = Ok(Some(Ok(Message::Text( + r#"{"type":"responsesapi.websocket_timing","timing_metrics":{"responses_duration_excl_engine_and_client_tool_time_ms":124,"engine_service_total_ms":457}}"# + .into(), + )))); + manager.record_websocket_event(&ws_timing_response, Duration::from_millis(20)); let summary = manager .runtime_metrics_summary() @@ -84,9 +92,11 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<( duration_ms: 400, }, websocket_events: RuntimeMetricTotals { - count: 1, - duration_ms: 80, + count: 2, + duration_ms: 100, }, + responses_api_overhead_ms: 124, + responses_api_inference_time_ms: 457, }; assert_eq!(summary, expected); diff --git a/codex-rs/tui/src/history_cell.rs b/codex-rs/tui/src/history_cell.rs index 49ccc481b..b85ca8f6b 100644 --- a/codex-rs/tui/src/history_cell.rs +++ b/codex-rs/tui/src/history_cell.rs @@ -2213,6 +2213,14 @@ fn runtime_metrics_label(summary: RuntimeMetricsSummary) -> Option { summary.websocket_events.count )); } + if summary.responses_api_overhead_ms > 0 { + let duration = format_duration_ms(summary.responses_api_overhead_ms); + parts.push(format!("Responses API overhead: {duration}")); + } + if summary.responses_api_inference_time_ms > 0 { + let duration = format_duration_ms(summary.responses_api_inference_time_ms); + parts.push(format!("Responses API inference: {duration}")); + } if parts.is_empty() { None } else { @@ -2381,9 +2389,11 @@ mod tests { count: 4, duration_ms: 1_200, }, + responses_api_overhead_ms: 650, + responses_api_inference_time_ms: 1_940, }; let cell = FinalMessageSeparator::new(Some(12), Some(summary)); - let rendered = render_lines(&cell.display_lines(200)); + let rendered = render_lines(&cell.display_lines(300)); assert_eq!(rendered.len(), 1); assert!(!rendered[0].contains("Worked for")); @@ -2392,6 +2402,8 @@ mod tests { assert!(rendered[0].contains("WebSocket: 1 events send (700ms)")); assert!(rendered[0].contains("Streams: 6 events (900ms)")); assert!(rendered[0].contains("4 events received (1.2s)")); + assert!(rendered[0].contains("Responses API overhead: 650ms")); + assert!(rendered[0].contains("Responses API inference: 1.9s")); } #[test]