feat: log webscocket timing into runtime metrics (#10577)
This commit is contained in:
parent
a9eb766f33
commit
fcaed4cb88
8 changed files with 165 additions and 3 deletions
|
|
@ -72,6 +72,8 @@ use crate::transport_manager::TransportManager;
|
|||
pub const WEB_SEARCH_ELIGIBLE_HEADER: &str = "x-oai-web-search-eligible";
|
||||
pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state";
|
||||
pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata";
|
||||
pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str =
|
||||
"x-responsesapi-include-timing-metrics";
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
struct TurnMetadataCache {
|
||||
|
|
@ -489,6 +491,12 @@ impl ModelClientSession {
|
|||
if needs_new {
|
||||
let mut headers = options.extra_headers.clone();
|
||||
headers.extend(build_conversation_headers(options.conversation_id.clone()));
|
||||
if self.state.config.features.enabled(Feature::RuntimeMetrics) {
|
||||
headers.insert(
|
||||
X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER,
|
||||
HeaderValue::from_static("true"),
|
||||
);
|
||||
}
|
||||
let websocket_telemetry = self.build_websocket_telemetry();
|
||||
let new_conn: ApiWebSocketConnection =
|
||||
ApiWebSocketResponsesClient::new(api_provider, api_auth)
|
||||
|
|
|
|||
|
|
@ -61,6 +61,7 @@ pub mod token_data;
|
|||
mod truncate;
|
||||
mod unified_exec;
|
||||
pub mod windows_sandbox;
|
||||
pub use client::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
|
||||
pub use model_provider_info::DEFAULT_LMSTUDIO_PORT;
|
||||
pub use model_provider_info::DEFAULT_OLLAMA_PORT;
|
||||
pub use model_provider_info::LMSTUDIO_OSS_PROVIDER_ID;
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ use codex_core::ResponseEvent;
|
|||
use codex_core::ResponseItem;
|
||||
use codex_core::TransportManager;
|
||||
use codex_core::WireApi;
|
||||
use codex_core::X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::models_manager::manager::ModelsManager;
|
||||
use codex_core::protocol::SessionSource;
|
||||
|
|
@ -102,6 +103,72 @@ async fn responses_websocket_emits_websocket_telemetry_events() {
|
|||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_includes_timing_metrics_header_when_runtime_metrics_enabled() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
serde_json::json!({
|
||||
"type": "responsesapi.websocket_timing",
|
||||
"timing_metrics": {
|
||||
"responses_duration_excl_engine_and_client_tool_time_ms": 120,
|
||||
"engine_service_total_ms": 450
|
||||
}
|
||||
}),
|
||||
ev_completed("resp-1"),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness_with_runtime_metrics(&server, true).await;
|
||||
harness.otel_manager.reset_runtime_metrics();
|
||||
let mut session = harness.client.new_session(None);
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
|
||||
stream_until_complete(&mut session, &prompt).await;
|
||||
tokio::time::sleep(Duration::from_millis(10)).await;
|
||||
|
||||
let handshake = server.single_handshake();
|
||||
assert_eq!(
|
||||
handshake.header(X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER),
|
||||
Some("true".to_string())
|
||||
);
|
||||
|
||||
let summary = harness
|
||||
.otel_manager
|
||||
.runtime_metrics_summary()
|
||||
.expect("runtime metrics summary");
|
||||
assert_eq!(summary.responses_api_overhead_ms, 120);
|
||||
assert_eq!(summary.responses_api_inference_time_ms, 450);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_omits_timing_metrics_header_when_runtime_metrics_disabled() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_websocket_server(vec![vec![vec![
|
||||
ev_response_created("resp-1"),
|
||||
ev_completed("resp-1"),
|
||||
]]])
|
||||
.await;
|
||||
|
||||
let harness = websocket_harness_with_runtime_metrics(&server, false).await;
|
||||
let mut session = harness.client.new_session(None);
|
||||
let prompt = prompt_with_input(vec![message_item("hello")]);
|
||||
|
||||
stream_until_complete(&mut session, &prompt).await;
|
||||
|
||||
let handshake = server.single_handshake();
|
||||
assert_eq!(
|
||||
handshake.header(X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER),
|
||||
None
|
||||
);
|
||||
|
||||
server.shutdown().await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn responses_websocket_emits_reasoning_included_event() {
|
||||
skip_if_no_network!();
|
||||
|
|
@ -241,11 +308,21 @@ fn websocket_provider(server: &WebSocketTestServer) -> ModelProviderInfo {
|
|||
}
|
||||
|
||||
async fn websocket_harness(server: &WebSocketTestServer) -> WebsocketTestHarness {
|
||||
websocket_harness_with_runtime_metrics(server, false).await
|
||||
}
|
||||
|
||||
async fn websocket_harness_with_runtime_metrics(
|
||||
server: &WebSocketTestServer,
|
||||
runtime_metrics_enabled: bool,
|
||||
) -> WebsocketTestHarness {
|
||||
let provider = websocket_provider(server);
|
||||
let codex_home = TempDir::new().unwrap();
|
||||
let mut config = load_default_config_for_test(&codex_home).await;
|
||||
config.model = Some(MODEL.to_string());
|
||||
config.features.enable(Feature::ResponsesWebsockets);
|
||||
if runtime_metrics_enabled {
|
||||
config.features.enable(Feature::RuntimeMetrics);
|
||||
}
|
||||
let config = Arc::new(config);
|
||||
let model_info = ModelsManager::construct_model_info_offline(MODEL, &config);
|
||||
let conversation_id = ThreadId::new();
|
||||
|
|
|
|||
|
|
@ -8,3 +8,7 @@ pub(crate) const WEBSOCKET_REQUEST_COUNT_METRIC: &str = "codex.websocket.request
|
|||
pub(crate) const WEBSOCKET_REQUEST_DURATION_METRIC: &str = "codex.websocket.request.duration_ms";
|
||||
pub(crate) const WEBSOCKET_EVENT_COUNT_METRIC: &str = "codex.websocket.event";
|
||||
pub(crate) const WEBSOCKET_EVENT_DURATION_METRIC: &str = "codex.websocket.event.duration_ms";
|
||||
pub(crate) const RESPONSES_API_OVERHEAD_DURATION_METRIC: &str =
|
||||
"codex.responses_api_overhead.duration_ms";
|
||||
pub(crate) const RESPONSES_API_INFERENCE_TIME_DURATION_METRIC: &str =
|
||||
"codex.responses_api_inference_time.duration_ms";
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
use crate::metrics::names::API_CALL_COUNT_METRIC;
|
||||
use crate::metrics::names::API_CALL_DURATION_METRIC;
|
||||
use crate::metrics::names::RESPONSES_API_INFERENCE_TIME_DURATION_METRIC;
|
||||
use crate::metrics::names::RESPONSES_API_OVERHEAD_DURATION_METRIC;
|
||||
use crate::metrics::names::SSE_EVENT_COUNT_METRIC;
|
||||
use crate::metrics::names::SSE_EVENT_DURATION_METRIC;
|
||||
use crate::metrics::names::TOOL_CALL_COUNT_METRIC;
|
||||
|
|
@ -32,6 +34,8 @@ pub struct RuntimeMetricsSummary {
|
|||
pub streaming_events: RuntimeMetricTotals,
|
||||
pub websocket_calls: RuntimeMetricTotals,
|
||||
pub websocket_events: RuntimeMetricTotals,
|
||||
pub responses_api_overhead_ms: u64,
|
||||
pub responses_api_inference_time_ms: u64,
|
||||
}
|
||||
|
||||
impl RuntimeMetricsSummary {
|
||||
|
|
@ -41,6 +45,8 @@ impl RuntimeMetricsSummary {
|
|||
&& self.streaming_events.is_empty()
|
||||
&& self.websocket_calls.is_empty()
|
||||
&& self.websocket_events.is_empty()
|
||||
&& self.responses_api_overhead_ms == 0
|
||||
&& self.responses_api_inference_time_ms == 0
|
||||
}
|
||||
|
||||
pub(crate) fn from_snapshot(snapshot: &ResourceMetrics) -> Self {
|
||||
|
|
@ -64,12 +70,18 @@ impl RuntimeMetricsSummary {
|
|||
count: sum_counter(snapshot, WEBSOCKET_EVENT_COUNT_METRIC),
|
||||
duration_ms: sum_histogram_ms(snapshot, WEBSOCKET_EVENT_DURATION_METRIC),
|
||||
};
|
||||
let responses_api_overhead_ms =
|
||||
sum_histogram_ms(snapshot, RESPONSES_API_OVERHEAD_DURATION_METRIC);
|
||||
let responses_api_inference_time_ms =
|
||||
sum_histogram_ms(snapshot, RESPONSES_API_INFERENCE_TIME_DURATION_METRIC);
|
||||
Self {
|
||||
tool_calls,
|
||||
api_calls,
|
||||
streaming_events,
|
||||
websocket_calls,
|
||||
websocket_events,
|
||||
responses_api_overhead_ms,
|
||||
responses_api_inference_time_ms,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
use crate::metrics::names::API_CALL_COUNT_METRIC;
|
||||
use crate::metrics::names::API_CALL_DURATION_METRIC;
|
||||
use crate::metrics::names::RESPONSES_API_INFERENCE_TIME_DURATION_METRIC;
|
||||
use crate::metrics::names::RESPONSES_API_OVERHEAD_DURATION_METRIC;
|
||||
use crate::metrics::names::SSE_EVENT_COUNT_METRIC;
|
||||
use crate::metrics::names::SSE_EVENT_DURATION_METRIC;
|
||||
use crate::metrics::names::TOOL_CALL_COUNT_METRIC;
|
||||
|
|
@ -42,6 +44,10 @@ pub use crate::ToolDecisionSource;
|
|||
|
||||
const SSE_UNKNOWN_KIND: &str = "unknown";
|
||||
const WEBSOCKET_UNKNOWN_KIND: &str = "unknown";
|
||||
const RESPONSES_WEBSOCKET_TIMING_KIND: &str = "responsesapi.websocket_timing";
|
||||
const RESPONSES_WEBSOCKET_TIMING_METRICS_FIELD: &str = "timing_metrics";
|
||||
const RESPONSES_API_OVERHEAD_FIELD: &str = "responses_duration_excl_engine_and_client_tool_time_ms";
|
||||
const RESPONSES_API_INFERENCE_FIELD: &str = "engine_service_total_ms";
|
||||
|
||||
impl OtelManager {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
|
|
@ -252,6 +258,9 @@ impl OtelManager {
|
|||
.get("type")
|
||||
.and_then(|value| value.as_str())
|
||||
.map(std::string::ToString::to_string);
|
||||
if kind.as_deref() == Some(RESPONSES_WEBSOCKET_TIMING_KIND) {
|
||||
self.record_responses_websocket_timing_metrics(&value);
|
||||
}
|
||||
if kind.as_deref() == Some("response.failed") {
|
||||
success = false;
|
||||
error_message = value
|
||||
|
|
@ -651,6 +660,22 @@ impl OtelManager {
|
|||
);
|
||||
}
|
||||
|
||||
fn record_responses_websocket_timing_metrics(&self, value: &serde_json::Value) {
|
||||
let timing_metrics = value.get(RESPONSES_WEBSOCKET_TIMING_METRICS_FIELD);
|
||||
|
||||
let overhead_value =
|
||||
timing_metrics.and_then(|value| value.get(RESPONSES_API_OVERHEAD_FIELD));
|
||||
if let Some(duration) = duration_from_ms_value(overhead_value) {
|
||||
self.record_duration(RESPONSES_API_OVERHEAD_DURATION_METRIC, duration, &[]);
|
||||
}
|
||||
|
||||
let inference_value =
|
||||
timing_metrics.and_then(|value| value.get(RESPONSES_API_INFERENCE_FIELD));
|
||||
if let Some(duration) = duration_from_ms_value(inference_value) {
|
||||
self.record_duration(RESPONSES_API_INFERENCE_TIME_DURATION_METRIC, duration, &[]);
|
||||
}
|
||||
}
|
||||
|
||||
fn responses_type(event: &ResponseEvent) -> String {
|
||||
match event {
|
||||
ResponseEvent::Created => "created".into(),
|
||||
|
|
@ -689,3 +714,16 @@ impl OtelManager {
|
|||
fn timestamp() -> String {
|
||||
Utc::now().to_rfc3339_opts(SecondsFormat::Millis, true)
|
||||
}
|
||||
|
||||
fn duration_from_ms_value(value: Option<&serde_json::Value>) -> Option<Duration> {
|
||||
let value = value?;
|
||||
let ms = value
|
||||
.as_f64()
|
||||
.or_else(|| value.as_i64().map(|v| v as f64))
|
||||
.or_else(|| value.as_u64().map(|v| v as f64))?;
|
||||
if !ms.is_finite() || ms < 0.0 {
|
||||
return None;
|
||||
}
|
||||
let clamped = ms.min(u64::MAX as f64);
|
||||
Some(Duration::from_millis(clamped.round() as u64))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -62,6 +62,14 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
|
|||
r#"{"type":"response.created"}"#.into(),
|
||||
))));
|
||||
manager.record_websocket_event(&ws_response, Duration::from_millis(80));
|
||||
let ws_timing_response: std::result::Result<
|
||||
Option<std::result::Result<Message, tokio_tungstenite::tungstenite::Error>>,
|
||||
codex_api::ApiError,
|
||||
> = Ok(Some(Ok(Message::Text(
|
||||
r#"{"type":"responsesapi.websocket_timing","timing_metrics":{"responses_duration_excl_engine_and_client_tool_time_ms":124,"engine_service_total_ms":457}}"#
|
||||
.into(),
|
||||
))));
|
||||
manager.record_websocket_event(&ws_timing_response, Duration::from_millis(20));
|
||||
|
||||
let summary = manager
|
||||
.runtime_metrics_summary()
|
||||
|
|
@ -84,9 +92,11 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
|
|||
duration_ms: 400,
|
||||
},
|
||||
websocket_events: RuntimeMetricTotals {
|
||||
count: 1,
|
||||
duration_ms: 80,
|
||||
count: 2,
|
||||
duration_ms: 100,
|
||||
},
|
||||
responses_api_overhead_ms: 124,
|
||||
responses_api_inference_time_ms: 457,
|
||||
};
|
||||
assert_eq!(summary, expected);
|
||||
|
||||
|
|
|
|||
|
|
@ -2213,6 +2213,14 @@ fn runtime_metrics_label(summary: RuntimeMetricsSummary) -> Option<String> {
|
|||
summary.websocket_events.count
|
||||
));
|
||||
}
|
||||
if summary.responses_api_overhead_ms > 0 {
|
||||
let duration = format_duration_ms(summary.responses_api_overhead_ms);
|
||||
parts.push(format!("Responses API overhead: {duration}"));
|
||||
}
|
||||
if summary.responses_api_inference_time_ms > 0 {
|
||||
let duration = format_duration_ms(summary.responses_api_inference_time_ms);
|
||||
parts.push(format!("Responses API inference: {duration}"));
|
||||
}
|
||||
if parts.is_empty() {
|
||||
None
|
||||
} else {
|
||||
|
|
@ -2381,9 +2389,11 @@ mod tests {
|
|||
count: 4,
|
||||
duration_ms: 1_200,
|
||||
},
|
||||
responses_api_overhead_ms: 650,
|
||||
responses_api_inference_time_ms: 1_940,
|
||||
};
|
||||
let cell = FinalMessageSeparator::new(Some(12), Some(summary));
|
||||
let rendered = render_lines(&cell.display_lines(200));
|
||||
let rendered = render_lines(&cell.display_lines(300));
|
||||
|
||||
assert_eq!(rendered.len(), 1);
|
||||
assert!(!rendered[0].contains("Worked for"));
|
||||
|
|
@ -2392,6 +2402,8 @@ mod tests {
|
|||
assert!(rendered[0].contains("WebSocket: 1 events send (700ms)"));
|
||||
assert!(rendered[0].contains("Streams: 6 events (900ms)"));
|
||||
assert!(rendered[0].contains("4 events received (1.2s)"));
|
||||
assert!(rendered[0].contains("Responses API overhead: 650ms"));
|
||||
assert!(rendered[0].contains("Responses API inference: 1.9s"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue