diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 9b9e0214a..0ac2b14f4 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -3238,12 +3238,9 @@ impl Session { turn_context: &TurnContext, token_usage: Option<&TokenUsage>, ) { - { + if let Some(token_usage) = token_usage { let mut state = self.state.lock().await; - if let Some(token_usage) = token_usage { - state - .update_token_info_from_usage(token_usage, turn_context.model_context_window()); - } + state.update_token_info_from_usage(token_usage, turn_context.model_context_window()); } self.send_token_count_event(turn_context).await; } diff --git a/codex-rs/core/src/state/turn.rs b/codex-rs/core/src/state/turn.rs index ccc50d066..9366880ac 100644 --- a/codex-rs/core/src/state/turn.rs +++ b/codex-rs/core/src/state/turn.rs @@ -15,6 +15,7 @@ use tokio::sync::oneshot; use crate::codex::TurnContext; use crate::protocol::ReviewDecision; +use crate::protocol::TokenUsage; use crate::tasks::SessionTask; /// Metadata about the currently running turn. @@ -73,6 +74,7 @@ pub(crate) struct TurnState { pending_user_input: HashMap>, pending_dynamic_tools: HashMap>, pending_input: Vec, + pub(crate) token_usage_at_turn_start: TokenUsage, } impl TurnState { diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 5133dc1e2..a814d36dc 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -199,11 +199,13 @@ impl Session { let mut active = self.active_turn.lock().await; let mut pending_input = Vec::::new(); let mut should_clear_active_turn = false; + let mut token_usage_at_turn_start = None; if let Some(at) = active.as_mut() && at.remove_task(&turn_context.sub_id) { let mut ts = at.turn_state.lock().await; pending_input = ts.take_pending_input(); + token_usage_at_turn_start = Some(ts.token_usage_at_turn_start.clone()); should_clear_active_turn = true; } if should_clear_active_turn { @@ -235,6 +237,52 @@ impl Session { } } } + // Emit token usage metrics. + if let Some(token_usage_at_turn_start) = token_usage_at_turn_start { + let total_token_usage = self.total_token_usage().await.unwrap_or_default(); + let turn_token_usage = crate::protocol::TokenUsage { + input_tokens: (total_token_usage.input_tokens + - token_usage_at_turn_start.input_tokens) + .max(0), + cached_input_tokens: (total_token_usage.cached_input_tokens + - token_usage_at_turn_start.cached_input_tokens) + .max(0), + output_tokens: (total_token_usage.output_tokens + - token_usage_at_turn_start.output_tokens) + .max(0), + reasoning_output_tokens: (total_token_usage.reasoning_output_tokens + - token_usage_at_turn_start.reasoning_output_tokens) + .max(0), + total_tokens: (total_token_usage.total_tokens + - token_usage_at_turn_start.total_tokens) + .max(0), + }; + self.services.otel_manager.histogram( + "codex.turn.token_usage", + turn_token_usage.total_tokens, + &[("token_type", "total")], + ); + self.services.otel_manager.histogram( + "codex.turn.token_usage", + turn_token_usage.input_tokens, + &[("token_type", "input")], + ); + self.services.otel_manager.histogram( + "codex.turn.token_usage", + turn_token_usage.cached_input(), + &[("token_type", "cached_input")], + ); + self.services.otel_manager.histogram( + "codex.turn.token_usage", + turn_token_usage.output_tokens, + &[("token_type", "output")], + ); + self.services.otel_manager.histogram( + "codex.turn.token_usage", + turn_token_usage.reasoning_output_tokens, + &[("token_type", "reasoning_output")], + ); + } let event = EventMsg::TurnComplete(TurnCompleteEvent { turn_id: turn_context.sub_id.clone(), last_agent_message, @@ -243,8 +291,10 @@ impl Session { } async fn register_new_active_task(&self, task: RunningTask) { + let token_usage_at_turn_start = self.total_token_usage().await.unwrap_or_default(); let mut active = self.active_turn.lock().await; let mut turn = ActiveTurn::default(); + turn.turn_state.lock().await.token_usage_at_turn_start = token_usage_at_turn_start; turn.add_task(task); *active = Some(turn); }