feat(otel, core): record turn TTFT and TTFM metrics in codex-core (#13630)

### Summary
This adds turn-level latency metrics for the first model output and the
first completed agent message.
- `codex.turn.ttft.duration_ms` starts at turn start and records on the
first output signal we see from the model. That includes normal
assistant text, reasoning deltas, and non-text outputs like tool-call
items.
- `codex.turn.ttfm.duration_ms` also starts at turn start, but it
records when the first agent message finishes streaming rather than when
its first delta arrives.

### Implementation notes
The timing is tracked in codex-core, not app-server, so the definition
stays consistent across CLI, TUI, and app-server clients.

I reused the existing turn lifecycle boundary that already drives
`codex.turn.e2e_duration_ms`, stored the turn start timestamp in turn
state, and record each metric once per turn.

I also wired the new metric names into the OTEL runtime metrics summary
so they show up in the same in-memory/debug snapshot path as the
existing timing metrics.
This commit is contained in:
Owen Lin 2026-03-06 10:23:48 -08:00 committed by GitHub
parent 6c98a59dbd
commit 3449e00bc9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 348 additions and 9 deletions

View file

@ -290,6 +290,9 @@ use crate::tools::sandboxing::ApprovalStore;
use crate::tools::spec::ToolsConfig;
use crate::tools::spec::ToolsConfigParams;
use crate::turn_diff_tracker::TurnDiffTracker;
use crate::turn_timing::TurnTimingState;
use crate::turn_timing::record_turn_ttfm_metric;
use crate::turn_timing::record_turn_ttft_metric;
use crate::unified_exec::UnifiedExecProcessManager;
use crate::util::backoff;
use crate::windows_sandbox::WindowsSandboxLevelExt;
@ -694,6 +697,7 @@ pub(crate) struct TurnContext {
pub(crate) dynamic_tools: Vec<DynamicToolSpec>,
pub(crate) turn_metadata_state: Arc<TurnMetadataState>,
pub(crate) turn_skills: TurnSkillsContext,
pub(crate) turn_timing_state: Arc<TurnTimingState>,
}
impl TurnContext {
pub(crate) fn model_context_window(&self) -> Option<i64> {
@ -783,6 +787,7 @@ impl TurnContext {
dynamic_tools: self.dynamic_tools.clone(),
turn_metadata_state: self.turn_metadata_state.clone(),
turn_skills: self.turn_skills.clone(),
turn_timing_state: Arc::clone(&self.turn_timing_state),
}
}
@ -1165,6 +1170,7 @@ impl Session {
dynamic_tools: session_configuration.dynamic_tools.clone(),
turn_metadata_state,
turn_skills: TurnSkillsContext::new(skills_outcome),
turn_timing_state: Arc::new(TurnTimingState::default()),
}
}
@ -2488,6 +2494,7 @@ impl Session {
turn_context: &TurnContext,
item: TurnItem,
) {
record_turn_ttfm_metric(turn_context, &item).await;
self.send_event(
turn_context,
EventMsg::ItemCompleted(ItemCompletedEvent {
@ -4988,6 +4995,7 @@ async fn spawn_review_thread(
truncation_policy: model_info.truncation_policy.into(),
turn_metadata_state,
turn_skills: TurnSkillsContext::new(parent_turn_context.turn_skills.outcome.clone()),
turn_timing_state: Arc::new(TurnTimingState::default()),
};
// Seed the child task with the review prompt as the initial user message.
@ -6583,6 +6591,7 @@ async fn try_run_sampling_request(
sess.services
.otel_manager
.record_responses(&handle_responses, &event);
record_turn_ttft_metric(&turn_context, &event).await;
match event {
ResponseEvent::Created => {}

View file

@ -111,6 +111,7 @@ pub mod terminal;
mod tools;
pub mod turn_diff_tracker;
mod turn_metadata;
mod turn_timing;
pub use rollout::ARCHIVED_SESSIONS_SUBDIR;
pub use rollout::INTERACTIVE_SESSION_SOURCES;
pub use rollout::RolloutRecorder;

View file

@ -7,6 +7,7 @@ mod user_shell;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use async_trait::async_trait;
use tokio::select;
@ -25,6 +26,7 @@ use crate::contextual_user_message::TURN_ABORTED_OPEN_TAG;
use crate::event_mapping::parse_turn_item;
use crate::models_manager::manager::ModelsManager;
use crate::protocol::EventMsg;
use crate::protocol::TokenUsage;
use crate::protocol::TurnAbortReason;
use crate::protocol::TurnAbortedEvent;
use crate::protocol::TurnCompleteEvent;
@ -131,10 +133,21 @@ impl Session {
let task: Arc<dyn SessionTask> = Arc::new(task);
let task_kind = task.kind();
let span_name = task.span_name();
let started_at = Instant::now();
turn_context
.turn_timing_state
.mark_turn_started(started_at)
.await;
let token_usage_at_turn_start = self.total_token_usage().await.unwrap_or_default();
let cancellation_token = CancellationToken::new();
let done = Arc::new(Notify::new());
let timer = turn_context
.otel_manager
.start_timer("codex.turn.e2e_duration_ms", &[])
.ok();
let done_clone = Arc::clone(&done);
let handle = {
let session_ctx = Arc::new(SessionTaskContext::new(Arc::clone(self)));
@ -174,11 +187,6 @@ impl Session {
)
};
let timer = turn_context
.otel_manager
.start_timer("codex.turn.e2e_duration_ms", &[])
.ok();
let running_task = RunningTask {
done,
handle: Arc::new(AbortOnDropHandle::new(handle)),
@ -188,7 +196,8 @@ impl Session {
turn_context: Arc::clone(&turn_context),
_timer: timer,
};
self.register_new_active_task(running_task).await;
self.register_new_active_task(running_task, token_usage_at_turn_start)
.await;
}
pub async fn abort_all_tasks(self: &Arc<Self>, reason: TurnAbortReason) {
@ -319,11 +328,16 @@ impl Session {
self.send_event(turn_context.as_ref(), event).await;
}
async fn register_new_active_task(&self, task: RunningTask) {
let token_usage_at_turn_start = self.total_token_usage().await.unwrap_or_default();
async fn register_new_active_task(
&self,
task: RunningTask,
token_usage_at_turn_start: TokenUsage,
) {
let mut active = self.active_turn.lock().await;
let mut turn = ActiveTurn::default();
turn.turn_state.lock().await.token_usage_at_turn_start = token_usage_at_turn_start;
let mut turn_state = turn.turn_state.lock().await;
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
drop(turn_state);
turn.add_task(task);
*active = Some(turn);
}

View file

@ -0,0 +1,283 @@
use std::time::Duration;
use std::time::Instant;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ResponseItem;
use tokio::sync::Mutex;
use crate::ResponseEvent;
use crate::codex::TurnContext;
use crate::stream_events_utils::raw_assistant_output_text_from_item;
const TURN_TTFT_DURATION_METRIC: &str = "codex.turn.ttft.duration_ms";
const TURN_TTFM_DURATION_METRIC: &str = "codex.turn.ttfm.duration_ms";
pub(crate) async fn record_turn_ttft_metric(turn_context: &TurnContext, event: &ResponseEvent) {
let Some(duration) = turn_context
.turn_timing_state
.record_ttft_for_response_event(event)
.await
else {
return;
};
turn_context
.otel_manager
.record_duration(TURN_TTFT_DURATION_METRIC, duration, &[]);
}
pub(crate) async fn record_turn_ttfm_metric(turn_context: &TurnContext, item: &TurnItem) {
let Some(duration) = turn_context
.turn_timing_state
.record_ttfm_for_turn_item(item)
.await
else {
return;
};
turn_context
.otel_manager
.record_duration(TURN_TTFM_DURATION_METRIC, duration, &[]);
}
#[derive(Debug, Default)]
pub(crate) struct TurnTimingState {
state: Mutex<TurnTimingStateInner>,
}
#[derive(Debug, Default)]
struct TurnTimingStateInner {
started_at: Option<Instant>,
first_token_at: Option<Instant>,
first_message_at: Option<Instant>,
}
impl TurnTimingState {
pub(crate) async fn mark_turn_started(&self, started_at: Instant) {
let mut state = self.state.lock().await;
state.started_at = Some(started_at);
state.first_token_at = None;
state.first_message_at = None;
}
pub(crate) async fn record_ttft_for_response_event(
&self,
event: &ResponseEvent,
) -> Option<Duration> {
if !response_event_records_turn_ttft(event) {
return None;
}
let mut state = self.state.lock().await;
state.record_turn_ttft()
}
pub(crate) async fn record_ttfm_for_turn_item(&self, item: &TurnItem) -> Option<Duration> {
if !matches!(item, TurnItem::AgentMessage(_)) {
return None;
}
let mut state = self.state.lock().await;
state.record_turn_ttfm()
}
}
impl TurnTimingStateInner {
fn record_turn_ttft(&mut self) -> Option<Duration> {
if self.first_token_at.is_some() {
return None;
}
let started_at = self.started_at?;
let first_token_at = Instant::now();
self.first_token_at = Some(first_token_at);
Some(first_token_at.duration_since(started_at))
}
fn record_turn_ttfm(&mut self) -> Option<Duration> {
if self.first_message_at.is_some() {
return None;
}
let started_at = self.started_at?;
let first_message_at = Instant::now();
self.first_message_at = Some(first_message_at);
Some(first_message_at.duration_since(started_at))
}
}
fn response_event_records_turn_ttft(event: &ResponseEvent) -> bool {
match event {
ResponseEvent::OutputItemDone(item) | ResponseEvent::OutputItemAdded(item) => {
response_item_records_turn_ttft(item)
}
ResponseEvent::OutputTextDelta(_)
| ResponseEvent::ReasoningSummaryDelta { .. }
| ResponseEvent::ReasoningContentDelta { .. } => true,
ResponseEvent::Created
| ResponseEvent::ServerModel(_)
| ResponseEvent::ServerReasoningIncluded(_)
| ResponseEvent::Completed { .. }
| ResponseEvent::ReasoningSummaryPartAdded { .. }
| ResponseEvent::RateLimits(_)
| ResponseEvent::ModelsEtag(_) => false,
}
}
fn response_item_records_turn_ttft(item: &ResponseItem) -> bool {
match item {
ResponseItem::Message { .. } => {
raw_assistant_output_text_from_item(item).is_some_and(|text| !text.is_empty())
}
ResponseItem::Reasoning {
summary, content, ..
} => {
summary.iter().any(|entry| match entry {
codex_protocol::models::ReasoningItemReasoningSummary::SummaryText { text } => {
!text.is_empty()
}
}) || content.as_ref().is_some_and(|entries| {
entries.iter().any(|entry| match entry {
codex_protocol::models::ReasoningItemContent::ReasoningText { text }
| codex_protocol::models::ReasoningItemContent::Text { text } => {
!text.is_empty()
}
})
})
}
ResponseItem::LocalShellCall { .. }
| ResponseItem::FunctionCall { .. }
| ResponseItem::CustomToolCall { .. }
| ResponseItem::WebSearchCall { .. }
| ResponseItem::ImageGenerationCall { .. }
| ResponseItem::GhostSnapshot { .. }
| ResponseItem::Compaction { .. } => true,
ResponseItem::FunctionCallOutput { .. }
| ResponseItem::CustomToolCallOutput { .. }
| ResponseItem::Other => false,
}
}
#[cfg(test)]
mod tests {
use codex_protocol::items::AgentMessageItem;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::FunctionCallOutputPayload;
use codex_protocol::models::ResponseItem;
use pretty_assertions::assert_eq;
use std::time::Instant;
use super::TurnTimingState;
use super::response_item_records_turn_ttft;
use crate::ResponseEvent;
#[tokio::test]
async fn turn_timing_state_records_ttft_only_once_per_turn() {
let state = TurnTimingState::default();
assert_eq!(
state
.record_ttft_for_response_event(&ResponseEvent::OutputTextDelta("hi".to_string()))
.await,
None
);
state.mark_turn_started(Instant::now()).await;
assert_eq!(
state
.record_ttft_for_response_event(&ResponseEvent::Created)
.await,
None
);
assert!(
state
.record_ttft_for_response_event(&ResponseEvent::OutputTextDelta("hi".to_string()))
.await
.is_some()
);
assert_eq!(
state
.record_ttft_for_response_event(&ResponseEvent::OutputTextDelta(
"again".to_string()
))
.await,
None
);
}
#[tokio::test]
async fn turn_timing_state_records_ttfm_independently_of_ttft() {
let state = TurnTimingState::default();
state.mark_turn_started(Instant::now()).await;
assert!(
state
.record_ttft_for_response_event(&ResponseEvent::OutputTextDelta("hi".to_string()))
.await
.is_some()
);
assert!(
state
.record_ttfm_for_turn_item(&TurnItem::AgentMessage(AgentMessageItem {
id: "msg-1".to_string(),
content: Vec::new(),
phase: None,
}))
.await
.is_some()
);
assert_eq!(
state
.record_ttfm_for_turn_item(&TurnItem::AgentMessage(AgentMessageItem {
id: "msg-2".to_string(),
content: Vec::new(),
phase: None,
}))
.await,
None
);
}
#[test]
fn response_item_records_turn_ttft_for_first_output_signals() {
assert!(response_item_records_turn_ttft(
&ResponseItem::FunctionCall {
id: None,
name: "shell".to_string(),
arguments: "{}".to_string(),
call_id: "call-1".to_string(),
}
));
assert!(response_item_records_turn_ttft(
&ResponseItem::CustomToolCall {
id: None,
status: None,
call_id: "call-2".to_string(),
name: "custom".to_string(),
input: "echo hi".to_string(),
}
));
assert!(response_item_records_turn_ttft(&ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: "hello".to_string(),
}],
end_turn: None,
phase: None,
}));
}
#[test]
fn response_item_records_turn_ttft_ignores_empty_non_output_items() {
assert!(!response_item_records_turn_ttft(&ResponseItem::Message {
id: None,
role: "assistant".to_string(),
content: vec![ContentItem::OutputText {
text: String::new(),
}],
end_turn: None,
phase: None,
}));
assert!(!response_item_records_turn_ttft(
&ResponseItem::FunctionCallOutput {
call_id: "call-1".to_string(),
output: FunctionCallOutputPayload::from_text("ok".to_string()),
}
));
}
}

View file

@ -20,3 +20,5 @@ pub(crate) const RESPONSES_API_ENGINE_IAPI_TBT_DURATION_METRIC: &str =
"codex.responses_api_engine_iapi_tbt.duration_ms";
pub(crate) const RESPONSES_API_ENGINE_SERVICE_TBT_DURATION_METRIC: &str =
"codex.responses_api_engine_service_tbt.duration_ms";
pub(crate) const TURN_TTFT_DURATION_METRIC: &str = "codex.turn.ttft.duration_ms";
pub(crate) const TURN_TTFM_DURATION_METRIC: &str = "codex.turn.ttfm.duration_ms";

View file

@ -10,6 +10,8 @@ use crate::metrics::names::SSE_EVENT_COUNT_METRIC;
use crate::metrics::names::SSE_EVENT_DURATION_METRIC;
use crate::metrics::names::TOOL_CALL_COUNT_METRIC;
use crate::metrics::names::TOOL_CALL_DURATION_METRIC;
use crate::metrics::names::TURN_TTFM_DURATION_METRIC;
use crate::metrics::names::TURN_TTFT_DURATION_METRIC;
use crate::metrics::names::WEBSOCKET_EVENT_COUNT_METRIC;
use crate::metrics::names::WEBSOCKET_EVENT_DURATION_METRIC;
use crate::metrics::names::WEBSOCKET_REQUEST_COUNT_METRIC;
@ -49,6 +51,8 @@ pub struct RuntimeMetricsSummary {
pub responses_api_engine_service_ttft_ms: u64,
pub responses_api_engine_iapi_tbt_ms: u64,
pub responses_api_engine_service_tbt_ms: u64,
pub turn_ttft_ms: u64,
pub turn_ttfm_ms: u64,
}
impl RuntimeMetricsSummary {
@ -64,6 +68,8 @@ impl RuntimeMetricsSummary {
&& self.responses_api_engine_service_ttft_ms == 0
&& self.responses_api_engine_iapi_tbt_ms == 0
&& self.responses_api_engine_service_tbt_ms == 0
&& self.turn_ttft_ms == 0
&& self.turn_ttfm_ms == 0
}
pub fn merge(&mut self, other: Self) {
@ -90,6 +96,12 @@ impl RuntimeMetricsSummary {
if other.responses_api_engine_service_tbt_ms > 0 {
self.responses_api_engine_service_tbt_ms = other.responses_api_engine_service_tbt_ms;
}
if other.turn_ttft_ms > 0 {
self.turn_ttft_ms = other.turn_ttft_ms;
}
if other.turn_ttfm_ms > 0 {
self.turn_ttfm_ms = other.turn_ttfm_ms;
}
}
pub fn responses_api_summary(&self) -> RuntimeMetricsSummary {
@ -137,6 +149,8 @@ impl RuntimeMetricsSummary {
sum_histogram_ms(snapshot, RESPONSES_API_ENGINE_IAPI_TBT_DURATION_METRIC);
let responses_api_engine_service_tbt_ms =
sum_histogram_ms(snapshot, RESPONSES_API_ENGINE_SERVICE_TBT_DURATION_METRIC);
let turn_ttft_ms = sum_histogram_ms(snapshot, TURN_TTFT_DURATION_METRIC);
let turn_ttfm_ms = sum_histogram_ms(snapshot, TURN_TTFM_DURATION_METRIC);
Self {
tool_calls,
api_calls,
@ -149,6 +163,8 @@ impl RuntimeMetricsSummary {
responses_api_engine_service_ttft_ms,
responses_api_engine_iapi_tbt_ms,
responses_api_engine_service_tbt_ms,
turn_ttft_ms,
turn_ttfm_ms,
}
}
}

View file

@ -74,6 +74,16 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
.into(),
))));
manager.record_websocket_event(&ws_timing_response, Duration::from_millis(20));
manager.record_duration(
"codex.turn.ttft.duration_ms",
Duration::from_millis(95),
&[],
);
manager.record_duration(
"codex.turn.ttfm.duration_ms",
Duration::from_millis(180),
&[],
);
let summary = manager
.runtime_metrics_summary()
@ -105,6 +115,8 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<(
responses_api_engine_service_ttft_ms: 233,
responses_api_engine_iapi_tbt_ms: 377,
responses_api_engine_service_tbt_ms: 399,
turn_ttft_ms: 95,
turn_ttfm_ms: 180,
};
assert_eq!(summary, expected);

View file

@ -2580,6 +2580,8 @@ mod tests {
responses_api_engine_service_ttft_ms: 460,
responses_api_engine_iapi_tbt_ms: 1_180,
responses_api_engine_service_tbt_ms: 1_240,
turn_ttft_ms: 0,
turn_ttfm_ms: 0,
};
let cell = FinalMessageSeparator::new(Some(12), Some(summary));
let rendered = render_lines(&cell.display_lines(600));