From 3449e00bc9a9e28abc7d0db57057f76731ca673b Mon Sep 17 00:00:00 2001 From: Owen Lin Date: Fri, 6 Mar 2026 10:23:48 -0800 Subject: [PATCH] feat(otel, core): record turn TTFT and TTFM metrics in codex-core (#13630) ### Summary This adds turn-level latency metrics for the first model output and the first completed agent message. - `codex.turn.ttft.duration_ms` starts at turn start and records on the first output signal we see from the model. That includes normal assistant text, reasoning deltas, and non-text outputs like tool-call items. - `codex.turn.ttfm.duration_ms` also starts at turn start, but it records when the first agent message finishes streaming rather than when its first delta arrives. ### Implementation notes The timing is tracked in codex-core, not app-server, so the definition stays consistent across CLI, TUI, and app-server clients. I reused the existing turn lifecycle boundary that already drives `codex.turn.e2e_duration_ms`, stored the turn start timestamp in turn state, and record each metric once per turn. I also wired the new metric names into the OTEL runtime metrics summary so they show up in the same in-memory/debug snapshot path as the existing timing metrics. --- codex-rs/core/src/codex.rs | 9 + codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/tasks/mod.rs | 32 ++- codex-rs/core/src/turn_timing.rs | 283 +++++++++++++++++++ codex-rs/otel/src/metrics/names.rs | 2 + codex-rs/otel/src/metrics/runtime_metrics.rs | 16 ++ codex-rs/otel/tests/suite/runtime_summary.rs | 12 + codex-rs/tui/src/history_cell.rs | 2 + 8 files changed, 348 insertions(+), 9 deletions(-) create mode 100644 codex-rs/core/src/turn_timing.rs diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 7445b9fe0..c993eb3c6 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -290,6 +290,9 @@ use crate::tools::sandboxing::ApprovalStore; use crate::tools::spec::ToolsConfig; use crate::tools::spec::ToolsConfigParams; use crate::turn_diff_tracker::TurnDiffTracker; +use crate::turn_timing::TurnTimingState; +use crate::turn_timing::record_turn_ttfm_metric; +use crate::turn_timing::record_turn_ttft_metric; use crate::unified_exec::UnifiedExecProcessManager; use crate::util::backoff; use crate::windows_sandbox::WindowsSandboxLevelExt; @@ -694,6 +697,7 @@ pub(crate) struct TurnContext { pub(crate) dynamic_tools: Vec, pub(crate) turn_metadata_state: Arc, pub(crate) turn_skills: TurnSkillsContext, + pub(crate) turn_timing_state: Arc, } impl TurnContext { pub(crate) fn model_context_window(&self) -> Option { @@ -783,6 +787,7 @@ impl TurnContext { dynamic_tools: self.dynamic_tools.clone(), turn_metadata_state: self.turn_metadata_state.clone(), turn_skills: self.turn_skills.clone(), + turn_timing_state: Arc::clone(&self.turn_timing_state), } } @@ -1165,6 +1170,7 @@ impl Session { dynamic_tools: session_configuration.dynamic_tools.clone(), turn_metadata_state, turn_skills: TurnSkillsContext::new(skills_outcome), + turn_timing_state: Arc::new(TurnTimingState::default()), } } @@ -2488,6 +2494,7 @@ impl Session { turn_context: &TurnContext, item: TurnItem, ) { + record_turn_ttfm_metric(turn_context, &item).await; self.send_event( turn_context, EventMsg::ItemCompleted(ItemCompletedEvent { @@ -4988,6 +4995,7 @@ async fn spawn_review_thread( truncation_policy: model_info.truncation_policy.into(), turn_metadata_state, turn_skills: TurnSkillsContext::new(parent_turn_context.turn_skills.outcome.clone()), + turn_timing_state: Arc::new(TurnTimingState::default()), }; // Seed the child task with the review prompt as the initial user message. @@ -6583,6 +6591,7 @@ async fn try_run_sampling_request( sess.services .otel_manager .record_responses(&handle_responses, &event); + record_turn_ttft_metric(&turn_context, &event).await; match event { ResponseEvent::Created => {} diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 9b822a85a..d96654bf4 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -111,6 +111,7 @@ pub mod terminal; mod tools; pub mod turn_diff_tracker; mod turn_metadata; +mod turn_timing; pub use rollout::ARCHIVED_SESSIONS_SUBDIR; pub use rollout::INTERACTIVE_SESSION_SOURCES; pub use rollout::RolloutRecorder; diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index a8feeb513..f6962edfd 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -7,6 +7,7 @@ mod user_shell; use std::sync::Arc; use std::time::Duration; +use std::time::Instant; use async_trait::async_trait; use tokio::select; @@ -25,6 +26,7 @@ use crate::contextual_user_message::TURN_ABORTED_OPEN_TAG; use crate::event_mapping::parse_turn_item; use crate::models_manager::manager::ModelsManager; use crate::protocol::EventMsg; +use crate::protocol::TokenUsage; use crate::protocol::TurnAbortReason; use crate::protocol::TurnAbortedEvent; use crate::protocol::TurnCompleteEvent; @@ -131,10 +133,21 @@ impl Session { let task: Arc = Arc::new(task); let task_kind = task.kind(); let span_name = task.span_name(); + let started_at = Instant::now(); + turn_context + .turn_timing_state + .mark_turn_started(started_at) + .await; + let token_usage_at_turn_start = self.total_token_usage().await.unwrap_or_default(); let cancellation_token = CancellationToken::new(); let done = Arc::new(Notify::new()); + let timer = turn_context + .otel_manager + .start_timer("codex.turn.e2e_duration_ms", &[]) + .ok(); + let done_clone = Arc::clone(&done); let handle = { let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self))); @@ -174,11 +187,6 @@ impl Session { ) }; - let timer = turn_context - .otel_manager - .start_timer("codex.turn.e2e_duration_ms", &[]) - .ok(); - let running_task = RunningTask { done, handle: Arc::new(AbortOnDropHandle::new(handle)), @@ -188,7 +196,8 @@ impl Session { turn_context: Arc::clone(&turn_context), _timer: timer, }; - self.register_new_active_task(running_task).await; + self.register_new_active_task(running_task, token_usage_at_turn_start) + .await; } pub async fn abort_all_tasks(self: &Arc, reason: TurnAbortReason) { @@ -319,11 +328,16 @@ impl Session { self.send_event(turn_context.as_ref(), event).await; } - async fn register_new_active_task(&self, task: RunningTask) { - let token_usage_at_turn_start = self.total_token_usage().await.unwrap_or_default(); + async fn register_new_active_task( + &self, + task: RunningTask, + token_usage_at_turn_start: TokenUsage, + ) { let mut active = self.active_turn.lock().await; let mut turn = ActiveTurn::default(); - turn.turn_state.lock().await.token_usage_at_turn_start = token_usage_at_turn_start; + let mut turn_state = turn.turn_state.lock().await; + turn_state.token_usage_at_turn_start = token_usage_at_turn_start; + drop(turn_state); turn.add_task(task); *active = Some(turn); } diff --git a/codex-rs/core/src/turn_timing.rs b/codex-rs/core/src/turn_timing.rs new file mode 100644 index 000000000..b825fde94 --- /dev/null +++ b/codex-rs/core/src/turn_timing.rs @@ -0,0 +1,283 @@ +use std::time::Duration; +use std::time::Instant; + +use codex_protocol::items::TurnItem; +use codex_protocol::models::ResponseItem; +use tokio::sync::Mutex; + +use crate::ResponseEvent; +use crate::codex::TurnContext; +use crate::stream_events_utils::raw_assistant_output_text_from_item; + +const TURN_TTFT_DURATION_METRIC: &str = "codex.turn.ttft.duration_ms"; +const TURN_TTFM_DURATION_METRIC: &str = "codex.turn.ttfm.duration_ms"; + +pub(crate) async fn record_turn_ttft_metric(turn_context: &TurnContext, event: &ResponseEvent) { + let Some(duration) = turn_context + .turn_timing_state + .record_ttft_for_response_event(event) + .await + else { + return; + }; + turn_context + .otel_manager + .record_duration(TURN_TTFT_DURATION_METRIC, duration, &[]); +} + +pub(crate) async fn record_turn_ttfm_metric(turn_context: &TurnContext, item: &TurnItem) { + let Some(duration) = turn_context + .turn_timing_state + .record_ttfm_for_turn_item(item) + .await + else { + return; + }; + turn_context + .otel_manager + .record_duration(TURN_TTFM_DURATION_METRIC, duration, &[]); +} + +#[derive(Debug, Default)] +pub(crate) struct TurnTimingState { + state: Mutex, +} + +#[derive(Debug, Default)] +struct TurnTimingStateInner { + started_at: Option, + first_token_at: Option, + first_message_at: Option, +} + +impl TurnTimingState { + pub(crate) async fn mark_turn_started(&self, started_at: Instant) { + let mut state = self.state.lock().await; + state.started_at = Some(started_at); + state.first_token_at = None; + state.first_message_at = None; + } + + pub(crate) async fn record_ttft_for_response_event( + &self, + event: &ResponseEvent, + ) -> Option { + if !response_event_records_turn_ttft(event) { + return None; + } + let mut state = self.state.lock().await; + state.record_turn_ttft() + } + + pub(crate) async fn record_ttfm_for_turn_item(&self, item: &TurnItem) -> Option { + if !matches!(item, TurnItem::AgentMessage(_)) { + return None; + } + let mut state = self.state.lock().await; + state.record_turn_ttfm() + } +} + +impl TurnTimingStateInner { + fn record_turn_ttft(&mut self) -> Option { + if self.first_token_at.is_some() { + return None; + } + let started_at = self.started_at?; + let first_token_at = Instant::now(); + self.first_token_at = Some(first_token_at); + Some(first_token_at.duration_since(started_at)) + } + + fn record_turn_ttfm(&mut self) -> Option { + if self.first_message_at.is_some() { + return None; + } + let started_at = self.started_at?; + let first_message_at = Instant::now(); + self.first_message_at = Some(first_message_at); + Some(first_message_at.duration_since(started_at)) + } +} + +fn response_event_records_turn_ttft(event: &ResponseEvent) -> bool { + match event { + ResponseEvent::OutputItemDone(item) | ResponseEvent::OutputItemAdded(item) => { + response_item_records_turn_ttft(item) + } + ResponseEvent::OutputTextDelta(_) + | ResponseEvent::ReasoningSummaryDelta { .. } + | ResponseEvent::ReasoningContentDelta { .. } => true, + ResponseEvent::Created + | ResponseEvent::ServerModel(_) + | ResponseEvent::ServerReasoningIncluded(_) + | ResponseEvent::Completed { .. } + | ResponseEvent::ReasoningSummaryPartAdded { .. } + | ResponseEvent::RateLimits(_) + | ResponseEvent::ModelsEtag(_) => false, + } +} + +fn response_item_records_turn_ttft(item: &ResponseItem) -> bool { + match item { + ResponseItem::Message { .. } => { + raw_assistant_output_text_from_item(item).is_some_and(|text| !text.is_empty()) + } + ResponseItem::Reasoning { + summary, content, .. + } => { + summary.iter().any(|entry| match entry { + codex_protocol::models::ReasoningItemReasoningSummary::SummaryText { text } => { + !text.is_empty() + } + }) || content.as_ref().is_some_and(|entries| { + entries.iter().any(|entry| match entry { + codex_protocol::models::ReasoningItemContent::ReasoningText { text } + | codex_protocol::models::ReasoningItemContent::Text { text } => { + !text.is_empty() + } + }) + }) + } + ResponseItem::LocalShellCall { .. } + | ResponseItem::FunctionCall { .. } + | ResponseItem::CustomToolCall { .. } + | ResponseItem::WebSearchCall { .. } + | ResponseItem::ImageGenerationCall { .. } + | ResponseItem::GhostSnapshot { .. } + | ResponseItem::Compaction { .. } => true, + ResponseItem::FunctionCallOutput { .. } + | ResponseItem::CustomToolCallOutput { .. } + | ResponseItem::Other => false, + } +} + +#[cfg(test)] +mod tests { + use codex_protocol::items::AgentMessageItem; + use codex_protocol::items::TurnItem; + use codex_protocol::models::ContentItem; + use codex_protocol::models::FunctionCallOutputPayload; + use codex_protocol::models::ResponseItem; + use pretty_assertions::assert_eq; + use std::time::Instant; + + use super::TurnTimingState; + use super::response_item_records_turn_ttft; + use crate::ResponseEvent; + + #[tokio::test] + async fn turn_timing_state_records_ttft_only_once_per_turn() { + let state = TurnTimingState::default(); + assert_eq!( + state + .record_ttft_for_response_event(&ResponseEvent::OutputTextDelta("hi".to_string())) + .await, + None + ); + + state.mark_turn_started(Instant::now()).await; + assert_eq!( + state + .record_ttft_for_response_event(&ResponseEvent::Created) + .await, + None + ); + assert!( + state + .record_ttft_for_response_event(&ResponseEvent::OutputTextDelta("hi".to_string())) + .await + .is_some() + ); + assert_eq!( + state + .record_ttft_for_response_event(&ResponseEvent::OutputTextDelta( + "again".to_string() + )) + .await, + None + ); + } + + #[tokio::test] + async fn turn_timing_state_records_ttfm_independently_of_ttft() { + let state = TurnTimingState::default(); + state.mark_turn_started(Instant::now()).await; + + assert!( + state + .record_ttft_for_response_event(&ResponseEvent::OutputTextDelta("hi".to_string())) + .await + .is_some() + ); + assert!( + state + .record_ttfm_for_turn_item(&TurnItem::AgentMessage(AgentMessageItem { + id: "msg-1".to_string(), + content: Vec::new(), + phase: None, + })) + .await + .is_some() + ); + assert_eq!( + state + .record_ttfm_for_turn_item(&TurnItem::AgentMessage(AgentMessageItem { + id: "msg-2".to_string(), + content: Vec::new(), + phase: None, + })) + .await, + None + ); + } + + #[test] + fn response_item_records_turn_ttft_for_first_output_signals() { + assert!(response_item_records_turn_ttft( + &ResponseItem::FunctionCall { + id: None, + name: "shell".to_string(), + arguments: "{}".to_string(), + call_id: "call-1".to_string(), + } + )); + assert!(response_item_records_turn_ttft( + &ResponseItem::CustomToolCall { + id: None, + status: None, + call_id: "call-2".to_string(), + name: "custom".to_string(), + input: "echo hi".to_string(), + } + )); + assert!(response_item_records_turn_ttft(&ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: "hello".to_string(), + }], + end_turn: None, + phase: None, + })); + } + + #[test] + fn response_item_records_turn_ttft_ignores_empty_non_output_items() { + assert!(!response_item_records_turn_ttft(&ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![ContentItem::OutputText { + text: String::new(), + }], + end_turn: None, + phase: None, + })); + assert!(!response_item_records_turn_ttft( + &ResponseItem::FunctionCallOutput { + call_id: "call-1".to_string(), + output: FunctionCallOutputPayload::from_text("ok".to_string()), + } + )); + } +} diff --git a/codex-rs/otel/src/metrics/names.rs b/codex-rs/otel/src/metrics/names.rs index 76b46d245..932f94759 100644 --- a/codex-rs/otel/src/metrics/names.rs +++ b/codex-rs/otel/src/metrics/names.rs @@ -20,3 +20,5 @@ pub(crate) const RESPONSES_API_ENGINE_IAPI_TBT_DURATION_METRIC: &str = "codex.responses_api_engine_iapi_tbt.duration_ms"; pub(crate) const RESPONSES_API_ENGINE_SERVICE_TBT_DURATION_METRIC: &str = "codex.responses_api_engine_service_tbt.duration_ms"; +pub(crate) const TURN_TTFT_DURATION_METRIC: &str = "codex.turn.ttft.duration_ms"; +pub(crate) const TURN_TTFM_DURATION_METRIC: &str = "codex.turn.ttfm.duration_ms"; diff --git a/codex-rs/otel/src/metrics/runtime_metrics.rs b/codex-rs/otel/src/metrics/runtime_metrics.rs index dcac367d8..93f457a61 100644 --- a/codex-rs/otel/src/metrics/runtime_metrics.rs +++ b/codex-rs/otel/src/metrics/runtime_metrics.rs @@ -10,6 +10,8 @@ use crate::metrics::names::SSE_EVENT_COUNT_METRIC; use crate::metrics::names::SSE_EVENT_DURATION_METRIC; use crate::metrics::names::TOOL_CALL_COUNT_METRIC; use crate::metrics::names::TOOL_CALL_DURATION_METRIC; +use crate::metrics::names::TURN_TTFM_DURATION_METRIC; +use crate::metrics::names::TURN_TTFT_DURATION_METRIC; use crate::metrics::names::WEBSOCKET_EVENT_COUNT_METRIC; use crate::metrics::names::WEBSOCKET_EVENT_DURATION_METRIC; use crate::metrics::names::WEBSOCKET_REQUEST_COUNT_METRIC; @@ -49,6 +51,8 @@ pub struct RuntimeMetricsSummary { pub responses_api_engine_service_ttft_ms: u64, pub responses_api_engine_iapi_tbt_ms: u64, pub responses_api_engine_service_tbt_ms: u64, + pub turn_ttft_ms: u64, + pub turn_ttfm_ms: u64, } impl RuntimeMetricsSummary { @@ -64,6 +68,8 @@ impl RuntimeMetricsSummary { && self.responses_api_engine_service_ttft_ms == 0 && self.responses_api_engine_iapi_tbt_ms == 0 && self.responses_api_engine_service_tbt_ms == 0 + && self.turn_ttft_ms == 0 + && self.turn_ttfm_ms == 0 } pub fn merge(&mut self, other: Self) { @@ -90,6 +96,12 @@ impl RuntimeMetricsSummary { if other.responses_api_engine_service_tbt_ms > 0 { self.responses_api_engine_service_tbt_ms = other.responses_api_engine_service_tbt_ms; } + if other.turn_ttft_ms > 0 { + self.turn_ttft_ms = other.turn_ttft_ms; + } + if other.turn_ttfm_ms > 0 { + self.turn_ttfm_ms = other.turn_ttfm_ms; + } } pub fn responses_api_summary(&self) -> RuntimeMetricsSummary { @@ -137,6 +149,8 @@ impl RuntimeMetricsSummary { sum_histogram_ms(snapshot, RESPONSES_API_ENGINE_IAPI_TBT_DURATION_METRIC); let responses_api_engine_service_tbt_ms = sum_histogram_ms(snapshot, RESPONSES_API_ENGINE_SERVICE_TBT_DURATION_METRIC); + let turn_ttft_ms = sum_histogram_ms(snapshot, TURN_TTFT_DURATION_METRIC); + let turn_ttfm_ms = sum_histogram_ms(snapshot, TURN_TTFM_DURATION_METRIC); Self { tool_calls, api_calls, @@ -149,6 +163,8 @@ impl RuntimeMetricsSummary { responses_api_engine_service_ttft_ms, responses_api_engine_iapi_tbt_ms, responses_api_engine_service_tbt_ms, + turn_ttft_ms, + turn_ttfm_ms, } } } diff --git a/codex-rs/otel/tests/suite/runtime_summary.rs b/codex-rs/otel/tests/suite/runtime_summary.rs index 4020e70b4..8857387d5 100644 --- a/codex-rs/otel/tests/suite/runtime_summary.rs +++ b/codex-rs/otel/tests/suite/runtime_summary.rs @@ -74,6 +74,16 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<( .into(), )))); manager.record_websocket_event(&ws_timing_response, Duration::from_millis(20)); + manager.record_duration( + "codex.turn.ttft.duration_ms", + Duration::from_millis(95), + &[], + ); + manager.record_duration( + "codex.turn.ttfm.duration_ms", + Duration::from_millis(180), + &[], + ); let summary = manager .runtime_metrics_summary() @@ -105,6 +115,8 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<( responses_api_engine_service_ttft_ms: 233, responses_api_engine_iapi_tbt_ms: 377, responses_api_engine_service_tbt_ms: 399, + turn_ttft_ms: 95, + turn_ttfm_ms: 180, }; assert_eq!(summary, expected); diff --git a/codex-rs/tui/src/history_cell.rs b/codex-rs/tui/src/history_cell.rs index 3a568a0f8..1167a391e 100644 --- a/codex-rs/tui/src/history_cell.rs +++ b/codex-rs/tui/src/history_cell.rs @@ -2580,6 +2580,8 @@ mod tests { responses_api_engine_service_ttft_ms: 460, responses_api_engine_iapi_tbt_ms: 1_180, responses_api_engine_service_tbt_ms: 1_240, + turn_ttft_ms: 0, + turn_ttfm_ms: 0, }; let cell = FinalMessageSeparator::new(Some(12), Some(summary)); let rendered = render_lines(&cell.display_lines(600));