feat: add token usage on memories (#11618)
Add aggregated token usage metrics on phase 1 of memories
This commit is contained in:
parent
e6eb6be683
commit
e5e40e2d4b
2 changed files with 181 additions and 51 deletions
|
|
@ -76,6 +76,8 @@ mod metrics {
|
|||
pub(super) const MEMORY_PHASE_ONE_JOBS: &str = "codex.memory.phase1";
|
||||
/// Number of raw memories produced by phase-1 startup extraction.
|
||||
pub(super) const MEMORY_PHASE_ONE_OUTPUT: &str = "codex.memory.phase1.output";
|
||||
/// Histogram for aggregate token usage across one phase-1 startup run.
|
||||
pub(super) const MEMORY_PHASE_ONE_TOKEN_USAGE: &str = "codex.memory.phase1.token_usage";
|
||||
/// Number of phase-2 startup jobs grouped by status.
|
||||
pub(super) const MEMORY_PHASE_TWO_JOBS: &str = "codex.memory.phase2";
|
||||
/// Number of stage-1 memories included in each phase-2 consolidation step.
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ use codex_protocol::models::ResponseItem;
|
|||
use codex_protocol::openai_models::ModelInfo;
|
||||
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use codex_utils_sanitizer::redact_secrets;
|
||||
use futures::StreamExt;
|
||||
use serde::Deserialize;
|
||||
|
|
@ -28,7 +29,7 @@ use tracing::info;
|
|||
use tracing::warn;
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub(in crate::memories) struct Phase1RequestContext {
|
||||
pub(in crate::memories) struct RequestContext {
|
||||
pub(in crate::memories) model_info: ModelInfo,
|
||||
pub(in crate::memories) otel_manager: OtelManager,
|
||||
pub(in crate::memories) reasoning_effort: Option<ReasoningEffortConfig>,
|
||||
|
|
@ -36,18 +37,24 @@ pub(in crate::memories) struct Phase1RequestContext {
|
|||
pub(in crate::memories) turn_metadata_header: Option<String>,
|
||||
}
|
||||
|
||||
struct JobResult {
|
||||
outcome: JobOutcome,
|
||||
token_usage: Option<TokenUsage>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum PhaseOneJobOutcome {
|
||||
enum JobOutcome {
|
||||
SucceededWithOutput,
|
||||
SucceededNoOutput,
|
||||
Failed,
|
||||
}
|
||||
|
||||
struct PhaseOneOutcomeCounts {
|
||||
struct Stats {
|
||||
claimed: usize,
|
||||
succeeded_with_output: usize,
|
||||
succeeded_no_output: usize,
|
||||
failed: usize,
|
||||
total_token_usage: Option<TokenUsage>,
|
||||
}
|
||||
|
||||
/// Phase 1 model output payload.
|
||||
|
|
@ -92,7 +99,7 @@ pub(in crate::memories) async fn run(session: &Arc<Session>) {
|
|||
let outcomes = run_jobs(session, claimed_candidates, stage_one_context).await;
|
||||
|
||||
// 4. Metrics and logs.
|
||||
let counts = count_outcomes(outcomes);
|
||||
let counts = aggregate_stats(outcomes);
|
||||
emit_metrics(session, &counts);
|
||||
info!(
|
||||
"memory stage-1 extraction complete: {} job(s) claimed, {} succeeded ({} with output, {} no output), {} failed",
|
||||
|
|
@ -118,7 +125,7 @@ pub fn output_schema() -> Value {
|
|||
})
|
||||
}
|
||||
|
||||
impl Phase1RequestContext {
|
||||
impl RequestContext {
|
||||
pub(in crate::memories) fn from_turn_context(
|
||||
turn_context: &TurnContext,
|
||||
turn_metadata_header: Option<String>,
|
||||
|
|
@ -172,9 +179,9 @@ async fn claim_startup_jobs(session: &Arc<Session>) -> Option<Vec<codex_state::S
|
|||
}
|
||||
}
|
||||
|
||||
async fn build_request_context(session: &Arc<Session>) -> Phase1RequestContext {
|
||||
async fn build_request_context(session: &Arc<Session>) -> RequestContext {
|
||||
let turn_context = session.new_default_turn().await;
|
||||
Phase1RequestContext::from_turn_context(
|
||||
RequestContext::from_turn_context(
|
||||
turn_context.as_ref(),
|
||||
turn_context.resolve_turn_metadata_header().await,
|
||||
)
|
||||
|
|
@ -183,8 +190,8 @@ async fn build_request_context(session: &Arc<Session>) -> Phase1RequestContext {
|
|||
async fn run_jobs(
|
||||
session: &Arc<Session>,
|
||||
claimed_candidates: Vec<codex_state::Stage1JobClaim>,
|
||||
stage_one_context: Phase1RequestContext,
|
||||
) -> Vec<PhaseOneJobOutcome> {
|
||||
stage_one_context: RequestContext,
|
||||
) -> Vec<JobResult> {
|
||||
futures::stream::iter(claimed_candidates.into_iter())
|
||||
.map(|claim| {
|
||||
let session = Arc::clone(session);
|
||||
|
|
@ -202,10 +209,10 @@ mod job {
|
|||
pub(in crate::memories) async fn run(
|
||||
session: &Session,
|
||||
claim: codex_state::Stage1JobClaim,
|
||||
stage_one_context: &Phase1RequestContext,
|
||||
) -> PhaseOneJobOutcome {
|
||||
stage_one_context: &RequestContext,
|
||||
) -> JobResult {
|
||||
let thread = claim.thread;
|
||||
let stage_one_output = match sample(
|
||||
let (stage_one_output, token_usage) = match sample(
|
||||
session,
|
||||
&thread.rollout_path,
|
||||
&thread.cwd,
|
||||
|
|
@ -222,23 +229,32 @@ mod job {
|
|||
&reason.to_string(),
|
||||
)
|
||||
.await;
|
||||
return PhaseOneJobOutcome::Failed;
|
||||
return JobResult {
|
||||
outcome: JobOutcome::Failed,
|
||||
token_usage: None,
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
if stage_one_output.raw_memory.is_empty() || stage_one_output.rollout_summary.is_empty() {
|
||||
return result::no_output(session, thread.id, &claim.ownership_token).await;
|
||||
return JobResult {
|
||||
outcome: result::no_output(session, thread.id, &claim.ownership_token).await,
|
||||
token_usage,
|
||||
};
|
||||
}
|
||||
|
||||
result::success(
|
||||
session,
|
||||
thread.id,
|
||||
&claim.ownership_token,
|
||||
thread.updated_at.timestamp(),
|
||||
&stage_one_output.raw_memory,
|
||||
&stage_one_output.rollout_summary,
|
||||
)
|
||||
.await
|
||||
JobResult {
|
||||
outcome: result::success(
|
||||
session,
|
||||
thread.id,
|
||||
&claim.ownership_token,
|
||||
thread.updated_at.timestamp(),
|
||||
&stage_one_output.raw_memory,
|
||||
&stage_one_output.rollout_summary,
|
||||
)
|
||||
.await,
|
||||
token_usage,
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract the rollout and perform the actual sampling.
|
||||
|
|
@ -246,8 +262,8 @@ mod job {
|
|||
session: &Session,
|
||||
rollout_path: &Path,
|
||||
rollout_cwd: &Path,
|
||||
stage_one_context: &Phase1RequestContext,
|
||||
) -> anyhow::Result<StageOneOutput> {
|
||||
stage_one_context: &RequestContext,
|
||||
) -> anyhow::Result<(StageOneOutput, Option<TokenUsage>)> {
|
||||
let (rollout_items, _, _) = RolloutRecorder::load_rollout_items(rollout_path).await?;
|
||||
let rollout_contents = serialize_filtered_rollout_response_items(&rollout_items)?;
|
||||
|
||||
|
|
@ -290,6 +306,7 @@ mod job {
|
|||
// TODO(jif) we should have a shared helper somewhere for this.
|
||||
// Unwrap the stream.
|
||||
let mut result = String::new();
|
||||
let mut token_usage = None;
|
||||
while let Some(message) = stream.next().await.transpose()? {
|
||||
match message {
|
||||
ResponseEvent::OutputTextDelta(delta) => result.push_str(&delta),
|
||||
|
|
@ -301,7 +318,12 @@ mod job {
|
|||
result.push_str(&text);
|
||||
}
|
||||
}
|
||||
ResponseEvent::Completed { .. } => break,
|
||||
ResponseEvent::Completed {
|
||||
token_usage: usage, ..
|
||||
} => {
|
||||
token_usage = usage;
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
|
@ -310,7 +332,7 @@ mod job {
|
|||
output.raw_memory = redact_secrets(output.raw_memory);
|
||||
output.rollout_summary = redact_secrets(output.rollout_summary);
|
||||
|
||||
Ok(output)
|
||||
Ok((output, token_usage))
|
||||
}
|
||||
|
||||
mod result {
|
||||
|
|
@ -339,9 +361,9 @@ mod job {
|
|||
session: &Session,
|
||||
thread_id: codex_protocol::ThreadId,
|
||||
ownership_token: &str,
|
||||
) -> PhaseOneJobOutcome {
|
||||
) -> JobOutcome {
|
||||
let Some(state_db) = session.services.state_db.as_deref() else {
|
||||
return PhaseOneJobOutcome::Failed;
|
||||
return JobOutcome::Failed;
|
||||
};
|
||||
|
||||
if state_db
|
||||
|
|
@ -349,9 +371,9 @@ mod job {
|
|||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
PhaseOneJobOutcome::SucceededNoOutput
|
||||
JobOutcome::SucceededNoOutput
|
||||
} else {
|
||||
PhaseOneJobOutcome::Failed
|
||||
JobOutcome::Failed
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -362,9 +384,9 @@ mod job {
|
|||
source_updated_at: i64,
|
||||
raw_memory: &str,
|
||||
rollout_summary: &str,
|
||||
) -> PhaseOneJobOutcome {
|
||||
) -> JobOutcome {
|
||||
let Some(state_db) = session.services.state_db.as_deref() else {
|
||||
return PhaseOneJobOutcome::Failed;
|
||||
return JobOutcome::Failed;
|
||||
};
|
||||
|
||||
if state_db
|
||||
|
|
@ -378,9 +400,9 @@ mod job {
|
|||
.await
|
||||
.unwrap_or(false)
|
||||
{
|
||||
PhaseOneJobOutcome::SucceededWithOutput
|
||||
JobOutcome::SucceededWithOutput
|
||||
} else {
|
||||
PhaseOneJobOutcome::Failed
|
||||
JobOutcome::Failed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -407,29 +429,37 @@ mod job {
|
|||
}
|
||||
}
|
||||
|
||||
fn count_outcomes(outcomes: Vec<PhaseOneJobOutcome>) -> PhaseOneOutcomeCounts {
|
||||
let succeeded_with_output = outcomes
|
||||
.iter()
|
||||
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededWithOutput))
|
||||
.count();
|
||||
let succeeded_no_output = outcomes
|
||||
.iter()
|
||||
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededNoOutput))
|
||||
.count();
|
||||
let failed = outcomes
|
||||
.iter()
|
||||
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::Failed))
|
||||
.count();
|
||||
fn aggregate_stats(outcomes: Vec<JobResult>) -> Stats {
|
||||
let claimed = outcomes.len();
|
||||
let mut succeeded_with_output = 0;
|
||||
let mut succeeded_no_output = 0;
|
||||
let mut failed = 0;
|
||||
let mut total_token_usage = TokenUsage::default();
|
||||
let mut has_token_usage = false;
|
||||
|
||||
PhaseOneOutcomeCounts {
|
||||
claimed: outcomes.len(),
|
||||
for outcome in outcomes {
|
||||
match outcome.outcome {
|
||||
JobOutcome::SucceededWithOutput => succeeded_with_output += 1,
|
||||
JobOutcome::SucceededNoOutput => succeeded_no_output += 1,
|
||||
JobOutcome::Failed => failed += 1,
|
||||
}
|
||||
|
||||
if let Some(token_usage) = outcome.token_usage {
|
||||
total_token_usage.add_assign(&token_usage);
|
||||
has_token_usage = true;
|
||||
}
|
||||
}
|
||||
|
||||
Stats {
|
||||
claimed,
|
||||
succeeded_with_output,
|
||||
succeeded_no_output,
|
||||
failed,
|
||||
total_token_usage: has_token_usage.then_some(total_token_usage),
|
||||
}
|
||||
}
|
||||
|
||||
fn emit_metrics(session: &Session, counts: &PhaseOneOutcomeCounts) {
|
||||
fn emit_metrics(session: &Session, counts: &Stats) {
|
||||
if counts.claimed > 0 {
|
||||
session.services.otel_manager.counter(
|
||||
metrics::MEMORY_PHASE_ONE_JOBS,
|
||||
|
|
@ -463,4 +493,102 @@ fn emit_metrics(session: &Session, counts: &PhaseOneOutcomeCounts) {
|
|||
&[("status", "failed")],
|
||||
);
|
||||
}
|
||||
if let Some(token_usage) = counts.total_token_usage.as_ref() {
|
||||
session.services.otel_manager.histogram(
|
||||
metrics::MEMORY_PHASE_ONE_TOKEN_USAGE,
|
||||
token_usage.total_tokens.max(0),
|
||||
&[("token_type", "total")],
|
||||
);
|
||||
session.services.otel_manager.histogram(
|
||||
metrics::MEMORY_PHASE_ONE_TOKEN_USAGE,
|
||||
token_usage.input_tokens.max(0),
|
||||
&[("token_type", "input")],
|
||||
);
|
||||
session.services.otel_manager.histogram(
|
||||
metrics::MEMORY_PHASE_ONE_TOKEN_USAGE,
|
||||
token_usage.cached_input(),
|
||||
&[("token_type", "cached_input")],
|
||||
);
|
||||
session.services.otel_manager.histogram(
|
||||
metrics::MEMORY_PHASE_ONE_TOKEN_USAGE,
|
||||
token_usage.output_tokens.max(0),
|
||||
&[("token_type", "output")],
|
||||
);
|
||||
session.services.otel_manager.histogram(
|
||||
metrics::MEMORY_PHASE_ONE_TOKEN_USAGE,
|
||||
token_usage.reasoning_output_tokens.max(0),
|
||||
&[("token_type", "reasoning_output")],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::JobOutcome;
|
||||
use super::JobResult;
|
||||
use super::aggregate_stats;
|
||||
use codex_protocol::protocol::TokenUsage;
|
||||
use pretty_assertions::assert_eq;
|
||||
|
||||
#[test]
|
||||
fn count_outcomes_sums_token_usage_across_all_jobs() {
|
||||
let counts = aggregate_stats(vec![
|
||||
JobResult {
|
||||
outcome: JobOutcome::SucceededWithOutput,
|
||||
token_usage: Some(TokenUsage {
|
||||
input_tokens: 10,
|
||||
cached_input_tokens: 2,
|
||||
output_tokens: 3,
|
||||
reasoning_output_tokens: 1,
|
||||
total_tokens: 13,
|
||||
}),
|
||||
},
|
||||
JobResult {
|
||||
outcome: JobOutcome::SucceededNoOutput,
|
||||
token_usage: Some(TokenUsage {
|
||||
input_tokens: 7,
|
||||
cached_input_tokens: 1,
|
||||
output_tokens: 2,
|
||||
reasoning_output_tokens: 0,
|
||||
total_tokens: 9,
|
||||
}),
|
||||
},
|
||||
JobResult {
|
||||
outcome: JobOutcome::Failed,
|
||||
token_usage: None,
|
||||
},
|
||||
]);
|
||||
|
||||
assert_eq!(counts.claimed, 3);
|
||||
assert_eq!(counts.succeeded_with_output, 1);
|
||||
assert_eq!(counts.succeeded_no_output, 1);
|
||||
assert_eq!(counts.failed, 1);
|
||||
assert_eq!(
|
||||
counts.total_token_usage,
|
||||
Some(TokenUsage {
|
||||
input_tokens: 17,
|
||||
cached_input_tokens: 3,
|
||||
output_tokens: 5,
|
||||
reasoning_output_tokens: 1,
|
||||
total_tokens: 22,
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn count_outcomes_keeps_usage_empty_when_no_job_reports_it() {
|
||||
let counts = aggregate_stats(vec![
|
||||
JobResult {
|
||||
outcome: JobOutcome::SucceededWithOutput,
|
||||
token_usage: None,
|
||||
},
|
||||
JobResult {
|
||||
outcome: JobOutcome::Failed,
|
||||
token_usage: None,
|
||||
},
|
||||
]);
|
||||
|
||||
assert_eq!(counts.claimed, 2);
|
||||
assert_eq!(counts.total_token_usage, None);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue