feat: metrics to memories (#11593)

This commit is contained in:
jif-oai 2026-02-12 15:28:48 +00:00 committed by GitHub
parent 04b60d65b3
commit aeaa68347f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 204 additions and 15 deletions

View file

@ -63,6 +63,17 @@ mod phase_two {
pub(super) const JOB_HEARTBEAT_SECONDS: u64 = 30;
}
mod metrics {
/// Number of phase-1 startup jobs grouped by status.
pub(super) const MEMORY_PHASE_ONE_JOBS: &str = "codex.memory.phase1";
/// Number of raw memories produced by phase-1 startup extraction.
pub(super) const MEMORY_PHASE_ONE_OUTPUT: &str = "codex.memory.phase1.output";
/// Number of phase-2 startup jobs grouped by status.
pub(super) const MEMORY_PHASE_TWO_JOBS: &str = "codex.memory.phase2";
/// Number of stage-1 memories included in each phase-2 consolidation step.
pub(super) const MEMORY_PHASE_TWO_INPUT: &str = "codex.memory.phase2.input";
}
pub fn memory_root(codex_home: &Path) -> PathBuf {
codex_home.join("memories")
}

View file

@ -2,6 +2,7 @@ use crate::codex::Session;
use crate::config::Config;
use crate::config::Constrained;
use crate::memories::memory_root;
use crate::memories::metrics;
use crate::memories::phase_two;
use crate::memories::prompts::build_consolidation_prompt;
use crate::memories::startup::phase2::spawn_phase2_completion_task;
@ -34,8 +35,14 @@ pub(super) async fn run_global_memory_consolidation(
session: &Arc<Session>,
config: Arc<Config>,
) -> bool {
let otel_manager = &session.services.otel_manager;
let Some(state_db) = session.services.state_db.as_deref() else {
warn!("state db unavailable; skipping global memory consolidation");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "skipped_state_db_unavailable")],
);
return false;
};
@ -46,6 +53,11 @@ pub(super) async fn run_global_memory_consolidation(
Ok(claim) => claim,
Err(err) => {
warn!("state db try_claim_global_phase2_job failed during memories startup: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_claim")],
);
return false;
}
};
@ -53,13 +65,26 @@ pub(super) async fn run_global_memory_consolidation(
codex_state::Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => (ownership_token, input_watermark),
} => {
otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "claimed")]);
(ownership_token, input_watermark)
}
codex_state::Phase2JobClaimOutcome::SkippedNotDirty => {
debug!("memory phase-2 global lock is up-to-date; skipping consolidation");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "skipped_not_dirty")],
);
return false;
}
codex_state::Phase2JobClaimOutcome::SkippedRunning => {
debug!("memory phase-2 global consolidation already running; skipping");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "skipped_running")],
);
return false;
}
};
@ -89,6 +114,11 @@ pub(super) async fn run_global_memory_consolidation(
.set(consolidation_sandbox_policy)
{
warn!("memory phase-2 consolidation sandbox policy was rejected by constraints: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_sandbox_policy")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
@ -108,6 +138,11 @@ pub(super) async fn run_global_memory_consolidation(
Ok(memories) => memories,
Err(err) => {
warn!("state db list_stage1_outputs_for_global failed during consolidation: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_load_stage1_outputs")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
@ -118,9 +153,21 @@ pub(super) async fn run_global_memory_consolidation(
return false;
}
};
if !latest_memories.is_empty() {
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_INPUT,
latest_memories.len() as i64,
&[],
);
}
let completion_watermark = completion_watermark(claimed_watermark, &latest_memories);
if let Err(err) = sync_rollout_summaries_from_memories(&root, &latest_memories).await {
warn!("failed syncing local memory artifacts for global consolidation: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_sync_artifacts")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
@ -133,6 +180,11 @@ pub(super) async fn run_global_memory_consolidation(
if let Err(err) = rebuild_raw_memories_file_from_memories(&root, &latest_memories).await {
warn!("failed rebuilding raw memories aggregate for global consolidation: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_rebuild_raw_memories")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,
@ -147,6 +199,11 @@ pub(super) async fn run_global_memory_consolidation(
let _ = state_db
.mark_global_phase2_job_succeeded(&ownership_token, completion_watermark)
.await;
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "succeeded_no_input")],
);
return false;
}
@ -169,6 +226,11 @@ pub(super) async fn run_global_memory_consolidation(
info!(
"memory phase-2 global consolidation agent started: agent_id={consolidation_agent_id}"
);
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "agent_spawned")],
);
spawn_phase2_completion_task(
session.as_ref(),
ownership_token,
@ -179,6 +241,11 @@ pub(super) async fn run_global_memory_consolidation(
}
Err(err) => {
warn!("failed to spawn global memory consolidation agent: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_spawn_agent")],
);
let _ = state_db
.mark_global_phase2_job_failed(
&ownership_token,

View file

@ -7,6 +7,7 @@ use crate::codex::TurnContext;
use crate::config::Config;
use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::memories::metrics;
use crate::memories::phase_one;
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
use codex_otel::OtelManager;
@ -19,6 +20,13 @@ use std::sync::Arc;
use tracing::info;
use tracing::warn;
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum PhaseOneJobOutcome {
SucceededWithOutput,
SucceededNoOutput,
Failed,
}
pub(super) const PHASE_ONE_THREAD_SCAN_LIMIT: usize = 5_000;
#[derive(Clone)]
@ -79,8 +87,19 @@ pub(super) async fn run_memories_startup_pipeline(
session: &Arc<Session>,
config: Arc<Config>,
) -> CodexResult<()> {
let otel_manager = &session.services.otel_manager;
let Some(state_db) = session.services.state_db.as_deref() else {
warn!("state db unavailable for memories startup pipeline; skipping");
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "skipped_state_db_unavailable")],
);
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "skipped_state_db_unavailable")],
);
return Ok(());
};
@ -106,12 +125,24 @@ pub(super) async fn run_memories_startup_pipeline(
Ok(claims) => claims,
Err(err) => {
warn!("state db claim_stage1_jobs_for_startup failed during memories startup: {err}");
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "failed_claim")],
);
Vec::new()
}
};
let claimed_count = claimed_candidates.len();
let mut succeeded_count = 0;
if claimed_count == 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
1,
&[("status", "skipped_no_candidates")],
);
}
let mut phase_one_outcomes = Vec::new();
if claimed_count > 0 {
let turn_context = session.new_default_turn().await;
let stage_one_context = StageOneRequestContext::from_turn_context(
@ -119,7 +150,7 @@ pub(super) async fn run_memories_startup_pipeline(
turn_context.resolve_turn_metadata_header().await,
);
succeeded_count = futures::stream::iter(claimed_candidates.into_iter())
phase_one_outcomes = futures::stream::iter(claimed_candidates.into_iter())
.map(|claim| {
let session = Arc::clone(session);
let stage_one_context = stage_one_context.clone();
@ -145,24 +176,29 @@ pub(super) async fn run_memories_startup_pipeline(
)
.await;
}
return false;
return PhaseOneJobOutcome::Failed;
}
};
let Some(state_db) = session.services.state_db.as_deref() else {
return false;
return PhaseOneJobOutcome::Failed;
};
if stage_one_output.raw_memory.is_empty()
&& stage_one_output.rollout_summary.is_empty()
{
return state_db
return if state_db
.mark_stage1_job_succeeded_no_output(thread.id, &claim.ownership_token)
.await
.unwrap_or(false);
.unwrap_or(false)
{
PhaseOneJobOutcome::SucceededNoOutput
} else {
PhaseOneJobOutcome::Failed
};
}
state_db
if state_db
.mark_stage1_job_succeeded(
thread.id,
&claim.ownership_token,
@ -172,19 +208,73 @@ pub(super) async fn run_memories_startup_pipeline(
)
.await
.unwrap_or(false)
{
PhaseOneJobOutcome::SucceededWithOutput
} else {
PhaseOneJobOutcome::Failed
}
}
})
.buffer_unordered(phase_one::CONCURRENCY_LIMIT)
.collect::<Vec<bool>>()
.await
.into_iter()
.filter(|ok| *ok)
.count();
.collect::<Vec<PhaseOneJobOutcome>>()
.await;
}
let succeeded_with_output_count = phase_one_outcomes
.iter()
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededWithOutput))
.count();
let succeeded_no_output_count = phase_one_outcomes
.iter()
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::SucceededNoOutput))
.count();
let failed_count = phase_one_outcomes
.iter()
.filter(|outcome| matches!(outcome, PhaseOneJobOutcome::Failed))
.count();
let succeeded_count = succeeded_with_output_count + succeeded_no_output_count;
if claimed_count > 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
claimed_count as i64,
&[("status", "claimed")],
);
}
if succeeded_with_output_count > 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
succeeded_with_output_count as i64,
&[("status", "succeeded")],
);
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_OUTPUT,
succeeded_with_output_count as i64,
&[],
);
}
if succeeded_no_output_count > 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
succeeded_no_output_count as i64,
&[("status", "succeeded_no_output")],
);
}
if failed_count > 0 {
otel_manager.counter(
metrics::MEMORY_PHASE_ONE_JOBS,
failed_count as i64,
&[("status", "failed")],
);
}
info!(
"memory stage-1 extraction complete: {} job(s) claimed, {} succeeded",
claimed_count, succeeded_count
"memory stage-1 extraction complete: {} job(s) claimed, {} succeeded ({} with output, {} no output), {} failed",
claimed_count,
succeeded_count,
succeeded_with_output_count,
succeeded_no_output_count,
failed_count
);
let consolidation_job_count =

View file

@ -1,6 +1,7 @@
use crate::agent::AgentStatus;
use crate::agent::status::is_final as is_final_agent_status;
use crate::codex::Session;
use crate::memories::metrics;
use crate::memories::phase_two;
use codex_protocol::ThreadId;
use std::sync::Arc;
@ -18,6 +19,7 @@ pub(super) fn spawn_phase2_completion_task(
) {
let state_db = session.services.state_db.clone();
let agent_control = session.services.agent_control.clone();
let otel_manager = session.services.otel_manager.clone();
tokio::spawn(async move {
let Some(state_db) = state_db else {
@ -30,6 +32,11 @@ pub(super) fn spawn_phase2_completion_task(
warn!(
"failed to subscribe to global memory consolidation agent {consolidation_agent_id}: {err}"
);
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_subscribe_status")],
);
mark_phase2_failed_with_recovery(
state_db.as_ref(),
&ownership_token,
@ -49,8 +56,22 @@ pub(super) fn spawn_phase2_completion_task(
)
.await;
if matches!(final_status, AgentStatus::Shutdown | AgentStatus::NotFound) {
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "failed_agent_unavailable")],
);
return;
}
if is_phase2_success(&final_status) {
otel_manager.counter(
metrics::MEMORY_PHASE_TWO_JOBS,
1,
&[("status", "succeeded")],
);
} else {
otel_manager.counter(metrics::MEMORY_PHASE_TWO_JOBS, 1, &[("status", "failed")]);
}
tokio::spawn(async move {
if let Err(err) = agent_control.shutdown_agent(consolidation_agent_id).await {