From 2293ab0e2108ff5835280dfb303ef3c6bcd141b0 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Wed, 18 Feb 2026 11:33:55 +0000 Subject: [PATCH] feat: phase 2 usage (#12121) --- codex-rs/core/src/agent/control.rs | 11 ++++++++++ codex-rs/core/src/codex.rs | 5 +++++ codex-rs/core/src/codex_thread.rs | 5 +++++ codex-rs/core/src/memories/mod.rs | 2 ++ codex-rs/core/src/memories/phase2.rs | 33 ++++++++++++++++++++++++++++ 5 files changed, 56 insertions(+) diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index b6cd0d82a..2a95ca6eb 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -6,6 +6,7 @@ use crate::thread_manager::ThreadManagerState; use codex_protocol::ThreadId; use codex_protocol::protocol::Op; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::TokenUsage; use codex_protocol::user_input::UserInput; use std::path::PathBuf; use std::sync::Arc; @@ -153,6 +154,16 @@ impl AgentControl { Ok(thread.subscribe_status()) } + pub(crate) async fn get_total_token_usage(&self, agent_id: ThreadId) -> Option { + let Ok(state) = self.upgrade() else { + return None; + }; + let Ok(thread) = state.get_thread(agent_id).await else { + return None; + }; + thread.total_token_usage().await + } + fn upgrade(&self) -> CodexResult> { self.manager .upgrade() diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 8f4660a6e..5e4c9bd95 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1498,6 +1498,11 @@ impl Session { state.history.get_total_token_usage_breakdown() } + pub(crate) async fn total_token_usage(&self) -> Option { + let state = self.state.lock().await; + state.token_info().map(|info| info.total_token_usage) + } + pub(crate) async fn get_estimated_token_count( &self, turn_context: &TurnContext, diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 88a052615..0b137ca2a 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -11,6 +11,7 @@ use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::TokenUsage; use codex_protocol::user_input::UserInput; use std::path::PathBuf; use tokio::sync::watch; @@ -73,6 +74,10 @@ impl CodexThread { self.codex.agent_status.clone() } + pub(crate) async fn total_token_usage(&self) -> Option { + self.codex.session.total_token_usage().await + } + pub fn rollout_path(&self) -> Option { self.rollout_path.clone() } diff --git a/codex-rs/core/src/memories/mod.rs b/codex-rs/core/src/memories/mod.rs index 183ff416b..ddf9878d7 100644 --- a/codex-rs/core/src/memories/mod.rs +++ b/codex-rs/core/src/memories/mod.rs @@ -79,6 +79,8 @@ mod metrics { pub(super) const MEMORY_PHASE_TWO_E2E_MS: &str = "codex.memory.phase2.e2e_ms"; /// Number of stage-1 memories included in each phase-2 consolidation step. pub(super) const MEMORY_PHASE_TWO_INPUT: &str = "codex.memory.phase2.input"; + /// Histogram for aggregate token usage across one phase-2 consolidation run. + pub(super) const MEMORY_PHASE_TWO_TOKEN_USAGE: &str = "codex.memory.phase2.token_usage"; } use std::path::Path; diff --git a/codex-rs/core/src/memories/phase2.rs b/codex-rs/core/src/memories/phase2.rs index 134b5609e..aa17cc158 100644 --- a/codex-rs/core/src/memories/phase2.rs +++ b/codex-rs/core/src/memories/phase2.rs @@ -14,6 +14,7 @@ use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; +use codex_protocol::protocol::TokenUsage; use codex_protocol::user_input::UserInput; use codex_state::StateRuntime; use codex_utils_absolute_path::AbsolutePathBuf; @@ -308,6 +309,9 @@ mod agent { .await; if matches!(final_status, AgentStatus::Completed(_)) { + if let Some(token_usage) = agent_control.get_total_token_usage(thread_id).await { + emit_token_usage_metrics(&session, &token_usage); + } job::succeed(&session, &db, &claim, new_watermark, "succeeded").await; } else { job::failed(&session, &db, &claim, "failed_agent").await; @@ -404,3 +408,32 @@ fn emit_metrics(session: &Arc, counters: Counters) { &[("status", "agent_spawned")], ); } + +fn emit_token_usage_metrics(session: &Arc, token_usage: &TokenUsage) { + let otel = session.services.otel_manager.clone(); + otel.histogram( + metrics::MEMORY_PHASE_TWO_TOKEN_USAGE, + token_usage.total_tokens.max(0), + &[("token_type", "total")], + ); + otel.histogram( + metrics::MEMORY_PHASE_TWO_TOKEN_USAGE, + token_usage.input_tokens.max(0), + &[("token_type", "input")], + ); + otel.histogram( + metrics::MEMORY_PHASE_TWO_TOKEN_USAGE, + token_usage.cached_input(), + &[("token_type", "cached_input")], + ); + otel.histogram( + metrics::MEMORY_PHASE_TWO_TOKEN_USAGE, + token_usage.output_tokens.max(0), + &[("token_type", "output")], + ); + otel.histogram( + metrics::MEMORY_PHASE_TWO_TOKEN_USAGE, + token_usage.reasoning_output_tokens.max(0), + &[("token_type", "reasoning_output")], + ); +}