diff --git a/codex-rs/core/src/memories/mod.rs b/codex-rs/core/src/memories/mod.rs index 30d6c4279..179c90cbb 100644 --- a/codex-rs/core/src/memories/mod.rs +++ b/codex-rs/core/src/memories/mod.rs @@ -76,6 +76,8 @@ mod metrics { pub(super) const MEMORY_PHASE_ONE_JOBS: &str = "codex.memory.phase1"; /// Number of raw memories produced by phase-1 startup extraction. pub(super) const MEMORY_PHASE_ONE_OUTPUT: &str = "codex.memory.phase1.output"; + /// Histogram for aggregate token usage across one phase-1 startup run. + pub(super) const MEMORY_PHASE_ONE_TOKEN_USAGE: &str = "codex.memory.phase1.token_usage"; /// Number of phase-2 startup jobs grouped by status. pub(super) const MEMORY_PHASE_TWO_JOBS: &str = "codex.memory.phase2"; /// Number of stage-1 memories included in each phase-2 consolidation step. diff --git a/codex-rs/core/src/memories/phase1.rs b/codex-rs/core/src/memories/phase1.rs index a7c171d3e..e11d29009 100644 --- a/codex-rs/core/src/memories/phase1.rs +++ b/codex-rs/core/src/memories/phase1.rs @@ -17,6 +17,7 @@ use codex_protocol::models::ResponseItem; use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::TokenUsage; use codex_utils_sanitizer::redact_secrets; use futures::StreamExt; use serde::Deserialize; @@ -28,7 +29,7 @@ use tracing::info; use tracing::warn; #[derive(Clone, Debug)] -pub(in crate::memories) struct Phase1RequestContext { +pub(in crate::memories) struct RequestContext { pub(in crate::memories) model_info: ModelInfo, pub(in crate::memories) otel_manager: OtelManager, pub(in crate::memories) reasoning_effort: Option, @@ -36,18 +37,24 @@ pub(in crate::memories) struct Phase1RequestContext { pub(in crate::memories) turn_metadata_header: Option, } +struct JobResult { + outcome: JobOutcome, + token_usage: Option, +} + #[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum PhaseOneJobOutcome { +enum JobOutcome { SucceededWithOutput, SucceededNoOutput, Failed, } -struct PhaseOneOutcomeCounts { +struct Stats { claimed: usize, succeeded_with_output: usize, succeeded_no_output: usize, failed: usize, + total_token_usage: Option, } /// Phase 1 model output payload. @@ -92,7 +99,7 @@ pub(in crate::memories) async fn run(session: &Arc) { let outcomes = run_jobs(session, claimed_candidates, stage_one_context).await; // 4. Metrics and logs. - let counts = count_outcomes(outcomes); + let counts = aggregate_stats(outcomes); emit_metrics(session, &counts); info!( "memory stage-1 extraction complete: {} job(s) claimed, {} succeeded ({} with output, {} no output), {} failed", @@ -118,7 +125,7 @@ pub fn output_schema() -> Value { }) } -impl Phase1RequestContext { +impl RequestContext { pub(in crate::memories) fn from_turn_context( turn_context: &TurnContext, turn_metadata_header: Option, @@ -172,9 +179,9 @@ async fn claim_startup_jobs(session: &Arc) -> Option) -> Phase1RequestContext { +async fn build_request_context(session: &Arc) -> RequestContext { let turn_context = session.new_default_turn().await; - Phase1RequestContext::from_turn_context( + RequestContext::from_turn_context( turn_context.as_ref(), turn_context.resolve_turn_metadata_header().await, ) @@ -183,8 +190,8 @@ async fn build_request_context(session: &Arc) -> Phase1RequestContext { async fn run_jobs( session: &Arc, claimed_candidates: Vec, - stage_one_context: Phase1RequestContext, -) -> Vec { + stage_one_context: RequestContext, +) -> Vec { futures::stream::iter(claimed_candidates.into_iter()) .map(|claim| { let session = Arc::clone(session); @@ -202,10 +209,10 @@ mod job { pub(in crate::memories) async fn run( session: &Session, claim: codex_state::Stage1JobClaim, - stage_one_context: &Phase1RequestContext, - ) -> PhaseOneJobOutcome { + stage_one_context: &RequestContext, + ) -> JobResult { let thread = claim.thread; - let stage_one_output = match sample( + let (stage_one_output, token_usage) = match sample( session, &thread.rollout_path, &thread.cwd, @@ -222,23 +229,32 @@ mod job { &reason.to_string(), ) .await; - return PhaseOneJobOutcome::Failed; + return JobResult { + outcome: JobOutcome::Failed, + token_usage: None, + }; } }; if stage_one_output.raw_memory.is_empty() || stage_one_output.rollout_summary.is_empty() { - return result::no_output(session, thread.id, &claim.ownership_token).await; + return JobResult { + outcome: result::no_output(session, thread.id, &claim.ownership_token).await, + token_usage, + }; } - result::success( - session, - thread.id, - &claim.ownership_token, - thread.updated_at.timestamp(), - &stage_one_output.raw_memory, - &stage_one_output.rollout_summary, - ) - .await + JobResult { + outcome: result::success( + session, + thread.id, + &claim.ownership_token, + thread.updated_at.timestamp(), + &stage_one_output.raw_memory, + &stage_one_output.rollout_summary, + ) + .await, + token_usage, + } } /// Extract the rollout and perform the actual sampling. @@ -246,8 +262,8 @@ mod job { session: &Session, rollout_path: &Path, rollout_cwd: &Path, - stage_one_context: &Phase1RequestContext, - ) -> anyhow::Result { + stage_one_context: &RequestContext, + ) -> anyhow::Result<(StageOneOutput, Option)> { let (rollout_items, _, _) = RolloutRecorder::load_rollout_items(rollout_path).await?; let rollout_contents = serialize_filtered_rollout_response_items(&rollout_items)?; @@ -290,6 +306,7 @@ mod job { // TODO(jif) we should have a shared helper somewhere for this. // Unwrap the stream. let mut result = String::new(); + let mut token_usage = None; while let Some(message) = stream.next().await.transpose()? { match message { ResponseEvent::OutputTextDelta(delta) => result.push_str(&delta), @@ -301,7 +318,12 @@ mod job { result.push_str(&text); } } - ResponseEvent::Completed { .. } => break, + ResponseEvent::Completed { + token_usage: usage, .. + } => { + token_usage = usage; + break; + } _ => {} } } @@ -310,7 +332,7 @@ mod job { output.raw_memory = redact_secrets(output.raw_memory); output.rollout_summary = redact_secrets(output.rollout_summary); - Ok(output) + Ok((output, token_usage)) } mod result { @@ -339,9 +361,9 @@ mod job { session: &Session, thread_id: codex_protocol::ThreadId, ownership_token: &str, - ) -> PhaseOneJobOutcome { + ) -> JobOutcome { let Some(state_db) = session.services.state_db.as_deref() else { - return PhaseOneJobOutcome::Failed; + return JobOutcome::Failed; }; if state_db @@ -349,9 +371,9 @@ mod job { .await .unwrap_or(false) { - PhaseOneJobOutcome::SucceededNoOutput + JobOutcome::SucceededNoOutput } else { - PhaseOneJobOutcome::Failed + JobOutcome::Failed } } @@ -362,9 +384,9 @@ mod job { source_updated_at: i64, raw_memory: &str, rollout_summary: &str, - ) -> PhaseOneJobOutcome { + ) -> JobOutcome { let Some(state_db) = session.services.state_db.as_deref() else { - return PhaseOneJobOutcome::Failed; + return JobOutcome::Failed; }; if state_db @@ -378,9 +400,9 @@ mod job { .await .unwrap_or(false) { - PhaseOneJobOutcome::SucceededWithOutput + JobOutcome::SucceededWithOutput } else { - PhaseOneJobOutcome::Failed + JobOutcome::Failed } } } @@ -407,29 +429,37 @@ mod job { } } -fn count_outcomes(outcomes: Vec) -> PhaseOneOutcomeCounts { - let succeeded_with_output = outcomes - .iter() - .filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededWithOutput)) - .count(); - let succeeded_no_output = outcomes - .iter() - .filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededNoOutput)) - .count(); - let failed = outcomes - .iter() - .filter(|outcome| matches!(outcome, PhaseOneJobOutcome::Failed)) - .count(); +fn aggregate_stats(outcomes: Vec) -> Stats { + let claimed = outcomes.len(); + let mut succeeded_with_output = 0; + let mut succeeded_no_output = 0; + let mut failed = 0; + let mut total_token_usage = TokenUsage::default(); + let mut has_token_usage = false; - PhaseOneOutcomeCounts { - claimed: outcomes.len(), + for outcome in outcomes { + match outcome.outcome { + JobOutcome::SucceededWithOutput => succeeded_with_output += 1, + JobOutcome::SucceededNoOutput => succeeded_no_output += 1, + JobOutcome::Failed => failed += 1, + } + + if let Some(token_usage) = outcome.token_usage { + total_token_usage.add_assign(&token_usage); + has_token_usage = true; + } + } + + Stats { + claimed, succeeded_with_output, succeeded_no_output, failed, + total_token_usage: has_token_usage.then_some(total_token_usage), } } -fn emit_metrics(session: &Session, counts: &PhaseOneOutcomeCounts) { +fn emit_metrics(session: &Session, counts: &Stats) { if counts.claimed > 0 { session.services.otel_manager.counter( metrics::MEMORY_PHASE_ONE_JOBS, @@ -463,4 +493,102 @@ fn emit_metrics(session: &Session, counts: &PhaseOneOutcomeCounts) { &[("status", "failed")], ); } + if let Some(token_usage) = counts.total_token_usage.as_ref() { + session.services.otel_manager.histogram( + metrics::MEMORY_PHASE_ONE_TOKEN_USAGE, + token_usage.total_tokens.max(0), + &[("token_type", "total")], + ); + session.services.otel_manager.histogram( + metrics::MEMORY_PHASE_ONE_TOKEN_USAGE, + token_usage.input_tokens.max(0), + &[("token_type", "input")], + ); + session.services.otel_manager.histogram( + metrics::MEMORY_PHASE_ONE_TOKEN_USAGE, + token_usage.cached_input(), + &[("token_type", "cached_input")], + ); + session.services.otel_manager.histogram( + metrics::MEMORY_PHASE_ONE_TOKEN_USAGE, + token_usage.output_tokens.max(0), + &[("token_type", "output")], + ); + session.services.otel_manager.histogram( + metrics::MEMORY_PHASE_ONE_TOKEN_USAGE, + token_usage.reasoning_output_tokens.max(0), + &[("token_type", "reasoning_output")], + ); + } +} + +#[cfg(test)] +mod tests { + use super::JobOutcome; + use super::JobResult; + use super::aggregate_stats; + use codex_protocol::protocol::TokenUsage; + use pretty_assertions::assert_eq; + + #[test] + fn count_outcomes_sums_token_usage_across_all_jobs() { + let counts = aggregate_stats(vec![ + JobResult { + outcome: JobOutcome::SucceededWithOutput, + token_usage: Some(TokenUsage { + input_tokens: 10, + cached_input_tokens: 2, + output_tokens: 3, + reasoning_output_tokens: 1, + total_tokens: 13, + }), + }, + JobResult { + outcome: JobOutcome::SucceededNoOutput, + token_usage: Some(TokenUsage { + input_tokens: 7, + cached_input_tokens: 1, + output_tokens: 2, + reasoning_output_tokens: 0, + total_tokens: 9, + }), + }, + JobResult { + outcome: JobOutcome::Failed, + token_usage: None, + }, + ]); + + assert_eq!(counts.claimed, 3); + assert_eq!(counts.succeeded_with_output, 1); + assert_eq!(counts.succeeded_no_output, 1); + assert_eq!(counts.failed, 1); + assert_eq!( + counts.total_token_usage, + Some(TokenUsage { + input_tokens: 17, + cached_input_tokens: 3, + output_tokens: 5, + reasoning_output_tokens: 1, + total_tokens: 22, + }) + ); + } + + #[test] + fn count_outcomes_keeps_usage_empty_when_no_job_reports_it() { + let counts = aggregate_stats(vec![ + JobResult { + outcome: JobOutcome::SucceededWithOutput, + token_usage: None, + }, + JobResult { + outcome: JobOutcome::Failed, + token_usage: None, + }, + ]); + + assert_eq!(counts.claimed, 2); + assert_eq!(counts.total_token_usage, None); + } }