fix: db stuff mem (#11575)

* Documenting DB functions
* Fixing 1 nit where stage-2 was sorting the stage 1 in the wrong
direction
* Added some tests
This commit is contained in:
jif-oai 2026-02-12 12:53:47 +00:00 committed by GitHub
parent adad23f743
commit 19ab038488
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 463 additions and 23 deletions

View file

@ -1432,6 +1432,104 @@ WHERE id = 1
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn claim_stage1_jobs_prefilters_threads_with_up_to_date_memory() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let now = Utc::now();
let eligible_newer_at = now - Duration::hours(13);
let eligible_older_at = now - Duration::hours(14);
let current_thread_id =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("current thread id");
let up_to_date_thread_id =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("up-to-date thread id");
let stale_thread_id =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("stale thread id");
let worker_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("worker id");
let mut current =
test_thread_metadata(&codex_home, current_thread_id, codex_home.join("current"));
current.created_at = now;
current.updated_at = now;
runtime
.upsert_thread(&current)
.await
.expect("upsert current thread");
let mut up_to_date = test_thread_metadata(
&codex_home,
up_to_date_thread_id,
codex_home.join("up-to-date"),
);
up_to_date.created_at = eligible_newer_at;
up_to_date.updated_at = eligible_newer_at;
runtime
.upsert_thread(&up_to_date)
.await
.expect("upsert up-to-date thread");
let up_to_date_claim = runtime
.try_claim_stage1_job(
up_to_date_thread_id,
worker_id,
up_to_date.updated_at.timestamp(),
3600,
64,
)
.await
.expect("claim up-to-date thread for seed");
let up_to_date_token = match up_to_date_claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!("unexpected seed claim outcome: {other:?}"),
};
assert!(
runtime
.mark_stage1_job_succeeded(
up_to_date_thread_id,
up_to_date_token.as_str(),
up_to_date.updated_at.timestamp(),
"raw",
"summary",
)
.await
.expect("mark up-to-date thread succeeded"),
"seed stage1 success should complete for up-to-date thread"
);
let mut stale =
test_thread_metadata(&codex_home, stale_thread_id, codex_home.join("stale"));
stale.created_at = eligible_older_at;
stale.updated_at = eligible_older_at;
runtime
.upsert_thread(&stale)
.await
.expect("upsert stale thread");
let allowed_sources = vec!["cli".to_string()];
let claims = runtime
.claim_stage1_jobs_for_startup(
current_thread_id,
Stage1StartupClaimParams {
scan_limit: 1,
max_claimed: 1,
max_age_days: 30,
min_rollout_idle_hours: 12,
allowed_sources: allowed_sources.as_slice(),
lease_seconds: 3600,
},
)
.await
.expect("claim stage1 startup jobs");
assert_eq!(claims.len(), 1);
assert_eq!(claims[0].thread.id, stale_thread_id);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn claim_stage1_jobs_enforces_global_running_cap() {
let codex_home = unique_temp_dir();
@ -1559,6 +1657,92 @@ WHERE kind = 'memory_stage1'
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn claim_stage1_jobs_processes_two_full_batches_across_startup_passes() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let current_thread_id =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("current thread id");
let mut current =
test_thread_metadata(&codex_home, current_thread_id, codex_home.join("current"));
current.created_at = Utc::now();
current.updated_at = Utc::now();
runtime
.upsert_thread(&current)
.await
.expect("upsert current");
let eligible_at = Utc::now() - Duration::hours(13);
for idx in 0..200 {
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
let mut metadata = test_thread_metadata(
&codex_home,
thread_id,
codex_home.join(format!("thread-{idx}")),
);
metadata.created_at = eligible_at - Duration::seconds(idx as i64);
metadata.updated_at = eligible_at - Duration::seconds(idx as i64);
runtime
.upsert_thread(&metadata)
.await
.expect("upsert eligible thread");
}
let allowed_sources = vec!["cli".to_string()];
let first_claims = runtime
.claim_stage1_jobs_for_startup(
current_thread_id,
Stage1StartupClaimParams {
scan_limit: 5_000,
max_claimed: 64,
max_age_days: 30,
min_rollout_idle_hours: 12,
allowed_sources: allowed_sources.as_slice(),
lease_seconds: 3_600,
},
)
.await
.expect("first stage1 startup claim");
assert_eq!(first_claims.len(), 64);
for claim in first_claims {
assert!(
runtime
.mark_stage1_job_succeeded(
claim.thread.id,
claim.ownership_token.as_str(),
claim.thread.updated_at.timestamp(),
"raw",
"summary",
)
.await
.expect("mark first-batch stage1 success"),
"first batch stage1 completion should succeed"
);
}
let second_claims = runtime
.claim_stage1_jobs_for_startup(
current_thread_id,
Stage1StartupClaimParams {
scan_limit: 5_000,
max_claimed: 64,
max_age_days: 30,
min_rollout_idle_hours: 12,
allowed_sources: allowed_sources.as_slice(),
lease_seconds: 3_600,
},
)
.await
.expect("second stage1 startup claim");
assert_eq!(second_claims.len(), 64);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn stage1_output_cascades_on_thread_delete() {
let codex_home = unique_temp_dir();
@ -1695,6 +1879,88 @@ WHERE kind = 'memory_stage1'
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn stage1_retry_exhaustion_does_not_block_newer_watermark() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id");
let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id");
runtime
.upsert_thread(&test_thread_metadata(
&codex_home,
thread_id,
codex_home.join("workspace"),
))
.await
.expect("upsert thread");
for attempt in 0..3 {
let claim = runtime
.try_claim_stage1_job(thread_id, owner, 100, 3_600, 64)
.await
.expect("claim stage1 for retry exhaustion");
let ownership_token = match claim {
Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token,
other => panic!(
"attempt {} should claim stage1 before retries are exhausted: {other:?}",
attempt + 1
),
};
assert!(
runtime
.mark_stage1_job_failed(thread_id, ownership_token.as_str(), "boom", 0)
.await
.expect("mark stage1 failed"),
"attempt {} should decrement retry budget",
attempt + 1
);
}
let exhausted_claim = runtime
.try_claim_stage1_job(thread_id, owner, 100, 3_600, 64)
.await
.expect("claim stage1 after retry exhaustion");
assert_eq!(
exhausted_claim,
Stage1JobClaimOutcome::SkippedRetryExhausted
);
let newer_source_claim = runtime
.try_claim_stage1_job(thread_id, owner, 101, 3_600, 64)
.await
.expect("claim stage1 with newer source watermark");
assert!(
matches!(newer_source_claim, Stage1JobClaimOutcome::Claimed { .. }),
"newer source watermark should reset retry budget and be claimable"
);
let job_row = sqlx::query(
"SELECT retry_remaining, input_watermark FROM jobs WHERE kind = ? AND job_key = ?",
)
.bind("memory_stage1")
.bind(thread_id.to_string())
.fetch_one(runtime.pool.as_ref())
.await
.expect("load stage1 job row after newer-source claim");
assert_eq!(
job_row
.try_get::<i64, _>("retry_remaining")
.expect("retry_remaining"),
3
);
assert_eq!(
job_row
.try_get::<i64, _>("input_watermark")
.expect("input_watermark"),
101
);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn phase2_global_consolidation_reruns_when_watermark_advances() {
let codex_home = unique_temp_dir();
@ -2078,6 +2344,65 @@ VALUES (?, ?, ?, ?, ?)
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn phase2_backfilled_inputs_below_last_success_still_become_dirty() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
runtime
.enqueue_global_consolidation(500)
.await
.expect("enqueue initial consolidation");
let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner a");
let claim_a = runtime
.try_claim_global_phase2_job(owner_a, 3_600)
.await
.expect("claim initial consolidation");
let token_a = match claim_a {
Phase2JobClaimOutcome::Claimed {
ownership_token,
input_watermark,
} => {
assert_eq!(input_watermark, 500);
ownership_token
}
other => panic!("unexpected initial phase2 claim outcome: {other:?}"),
};
assert!(
runtime
.mark_global_phase2_job_succeeded(token_a.as_str(), 500)
.await
.expect("mark initial phase2 success"),
"initial phase2 success should finalize"
);
runtime
.enqueue_global_consolidation(400)
.await
.expect("enqueue backfilled consolidation");
let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner b");
let claim_b = runtime
.try_claim_global_phase2_job(owner_b, 3_600)
.await
.expect("claim backfilled consolidation");
match claim_b {
Phase2JobClaimOutcome::Claimed {
input_watermark, ..
} => {
assert!(
input_watermark > 500,
"backfilled enqueue should advance dirty watermark beyond last success"
);
}
other => panic!("unexpected backfilled phase2 claim outcome: {other:?}"),
}
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn phase2_failure_fallback_updates_unowned_running_job() {
let codex_home = unique_temp_dir();

View file

@ -18,6 +18,11 @@ const MEMORY_CONSOLIDATION_JOB_KEY: &str = "global";
const DEFAULT_RETRY_REMAINING: i64 = 3;
impl StateRuntime {
/// Deletes all persisted memory state in one transaction.
///
/// This removes every `stage1_outputs` row and all `jobs` rows for the
/// stage-1 (`memory_stage1`) and phase-2 (`memory_consolidate_global`)
/// memory pipelines.
pub async fn clear_memory_data(&self) -> anyhow::Result<()> {
let mut tx = self.pool.begin().await?;
@ -44,6 +49,22 @@ WHERE kind = ? OR kind = ?
Ok(())
}
/// Selects and claims stage-1 startup jobs for stale threads.
///
/// Query behavior:
/// - starts from `threads` filtered to active threads and allowed sources
/// (`push_thread_filters`)
/// - excludes the current thread id
/// - keeps only threads in the age window:
/// `updated_at >= now - max_age_days` and `updated_at <= now - min_rollout_idle_hours`
/// - keeps only threads whose memory is stale:
/// `COALESCE(stage1_outputs.source_updated_at, -1) < threads.updated_at` and
/// `COALESCE(jobs.last_success_watermark, -1) < threads.updated_at`
/// - orders by `updated_at DESC, id DESC` and applies `scan_limit`
///
/// For each selected thread, this function calls [`Self::try_claim_stage1_job`]
/// with `source_updated_at = thread.updated_at.timestamp()` and returns up to
/// `max_claimed` successful claims.
pub async fn claim_stage1_jobs_for_startup(
&self,
current_thread_id: ThreadId,
@ -87,6 +108,16 @@ SELECT
git_branch,
git_origin_url
FROM threads
LEFT JOIN stage1_outputs
ON stage1_outputs.thread_id = threads.id
LEFT JOIN jobs
ON jobs.kind =
"#,
);
builder.push_bind(JOB_KIND_MEMORY_STAGE1);
builder.push(
r#"
AND jobs.job_key = threads.id
"#,
);
push_thread_filters(
@ -104,6 +135,8 @@ FROM threads
.push(" AND updated_at >= ")
.push_bind(max_age_cutoff);
builder.push(" AND updated_at <= ").push_bind(idle_cutoff);
builder.push(" AND COALESCE(stage1_outputs.source_updated_at, -1) < updated_at");
builder.push(" AND COALESCE(jobs.last_success_watermark, -1) < updated_at");
push_thread_order_and_limit(&mut builder, SortKey::UpdatedAt, scan_limit);
let items = builder
@ -141,25 +174,12 @@ FROM threads
Ok(claimed)
}
pub async fn get_stage1_output(
&self,
thread_id: ThreadId,
) -> anyhow::Result<Option<Stage1Output>> {
let row = sqlx::query(
r#"
SELECT thread_id, source_updated_at, raw_memory, rollout_summary, generated_at
FROM stage1_outputs
WHERE thread_id = ?
"#,
)
.bind(thread_id.to_string())
.fetch_optional(self.pool.as_ref())
.await?;
row.map(|row| Stage1OutputRow::try_from_row(&row).and_then(Stage1Output::try_from))
.transpose()
}
/// Lists the most recent non-empty stage-1 outputs for global consolidation.
///
/// Query behavior:
/// - filters out rows where both `raw_memory` and `rollout_summary` are blank
/// - orders by `source_updated_at DESC, thread_id DESC`
/// - applies `LIMIT n`
pub async fn list_stage1_outputs_for_global(
&self,
n: usize,
@ -186,6 +206,22 @@ LIMIT ?
.collect::<Result<Vec<_>, _>>()
}
/// Attempts to claim a stage-1 job for a thread at `source_updated_at`.
///
/// Claim semantics:
/// - skips as up-to-date when either:
/// - `stage1_outputs.source_updated_at >= source_updated_at`, or
/// - `jobs.last_success_watermark >= source_updated_at`
/// - inserts or updates a `jobs` row to `running` only when:
/// - global running job count for `memory_stage1` is below `max_running_jobs`
/// - existing row is not actively running with a valid lease
/// - retry backoff (if present) has elapsed, or `source_updated_at` advanced
/// - retries remain, or `source_updated_at` advanced (which resets retries)
///
/// The update path refreshes ownership token, lease, and `input_watermark`.
/// If claiming fails, a follow-up read maps current row state to a precise
/// skip outcome (`SkippedRunning`, `SkippedRetryBackoff`, or
/// `SkippedRetryExhausted`).
pub async fn try_claim_stage1_job(
&self,
thread_id: ThreadId,
@ -274,12 +310,23 @@ ON CONFLICT(kind, job_key) DO UPDATE SET
finished_at = NULL,
lease_until = excluded.lease_until,
retry_at = NULL,
retry_remaining = CASE
WHEN excluded.input_watermark > COALESCE(jobs.input_watermark, -1) THEN ?
ELSE jobs.retry_remaining
END,
last_error = NULL,
input_watermark = excluded.input_watermark
WHERE
(jobs.status != 'running' OR jobs.lease_until IS NULL OR jobs.lease_until <= excluded.started_at)
AND (jobs.retry_at IS NULL OR jobs.retry_at <= excluded.started_at)
AND jobs.retry_remaining > 0
AND (
jobs.retry_at IS NULL
OR jobs.retry_at <= excluded.started_at
OR excluded.input_watermark > COALESCE(jobs.input_watermark, -1)
)
AND (
jobs.retry_remaining > 0
OR excluded.input_watermark > COALESCE(jobs.input_watermark, -1)
)
AND (
SELECT COUNT(*)
FROM jobs AS running_jobs
@ -302,6 +349,7 @@ WHERE
.bind(JOB_KIND_MEMORY_STAGE1)
.bind(now)
.bind(max_running_jobs)
.bind(DEFAULT_RETRY_REMAINING)
.bind(max_running_jobs)
.execute(&mut *tx)
.await?
@ -348,6 +396,15 @@ WHERE kind = ? AND job_key = ?
Ok(Stage1JobClaimOutcome::SkippedRunning)
}
/// Marks a claimed stage-1 job successful and upserts generated output.
///
/// Transaction behavior:
/// - updates `jobs` only for the currently owned running row
/// - sets `status='done'` and `last_success_watermark = input_watermark`
/// - upserts `stage1_outputs` for the thread, replacing existing output only
/// when `source_updated_at` is newer or equal
/// - enqueues/advances the global phase-2 job watermark using
/// `source_updated_at`
pub async fn mark_stage1_job_succeeded(
&self,
thread_id: ThreadId,
@ -417,6 +474,14 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
Ok(true)
}
/// Marks a claimed stage-1 job successful when extraction produced no output.
///
/// Transaction behavior:
/// - updates `jobs` only for the currently owned running row
/// - sets `status='done'` and `last_success_watermark = input_watermark`
/// - deletes any existing `stage1_outputs` row for the thread
/// - enqueues/advances the global phase-2 job watermark using the claimed
/// `input_watermark`
pub async fn mark_stage1_job_succeeded_no_output(
&self,
thread_id: ThreadId,
@ -482,6 +547,13 @@ WHERE thread_id = ?
Ok(true)
}
/// Marks a claimed stage-1 job as failed and schedules retry backoff.
///
/// Query behavior:
/// - updates only the owned running row for `(kind='memory_stage1', job_key)`
/// - sets `status='error'`, clears lease, writes `last_error`
/// - decrements `retry_remaining`
/// - sets `retry_at = now + retry_delay_seconds`
pub async fn mark_stage1_job_failed(
&self,
thread_id: ThreadId,
@ -520,11 +592,25 @@ WHERE kind = ? AND job_key = ?
Ok(rows_affected > 0)
}
/// Enqueues or advances the global phase-2 consolidation job watermark.
///
/// The underlying upsert keeps the job `running` when already running, resets
/// `pending/error` jobs to `pending`, and advances `input_watermark` so each
/// enqueue marks new consolidation work even when `source_updated_at` is
/// older than prior maxima.
pub async fn enqueue_global_consolidation(&self, input_watermark: i64) -> anyhow::Result<()> {
enqueue_global_consolidation_with_executor(self.pool.as_ref(), input_watermark).await
}
/// Try to claim the global phase-2 consolidation job.
/// Attempts to claim the global phase-2 consolidation job.
///
/// Claim semantics:
/// - reads the singleton global job row (`kind='memory_consolidate_global'`)
/// - returns `SkippedNotDirty` when `input_watermark <= last_success_watermark`
/// - returns `SkippedNotDirty` when retries are exhausted or retry backoff is active
/// - returns `SkippedRunning` when an active running lease exists
/// - otherwise updates the row to `running`, sets ownership + lease, and
/// returns `Claimed`
pub async fn try_claim_global_phase2_job(
&self,
worker_id: ThreadId,
@ -623,6 +709,11 @@ WHERE kind = ? AND job_key = ?
}
}
/// Extends the lease for an owned running phase-2 global job.
///
/// Query behavior:
/// - `UPDATE jobs SET lease_until = ?` for the singleton global row
/// - requires `status='running'` and matching `ownership_token`
pub async fn heartbeat_global_phase2_job(
&self,
ownership_token: &str,
@ -649,6 +740,13 @@ WHERE kind = ? AND job_key = ?
Ok(rows_affected > 0)
}
/// Marks the owned running global phase-2 job as succeeded.
///
/// Query behavior:
/// - updates only the owned running singleton global row
/// - sets `status='done'`, clears lease/errors
/// - advances `last_success_watermark` to
/// `max(existing_last_success_watermark, completed_watermark)`
pub async fn mark_global_phase2_job_succeeded(
&self,
ownership_token: &str,
@ -680,6 +778,13 @@ WHERE kind = ? AND job_key = ?
Ok(rows_affected > 0)
}
/// Marks the owned running global phase-2 job as failed and schedules retry.
///
/// Query behavior:
/// - updates only the owned running singleton global row
/// - sets `status='error'`, clears lease
/// - writes failure reason and retry time
/// - decrements `retry_remaining`
pub async fn mark_global_phase2_job_failed(
&self,
ownership_token: &str,
@ -715,6 +820,12 @@ WHERE kind = ? AND job_key = ?
Ok(rows_affected > 0)
}
/// Fallback failure finalization when ownership may have been lost.
///
/// Query behavior:
/// - same state transition as [`Self::mark_global_phase2_job_failed`]
/// - matches rows where `ownership_token = ? OR ownership_token IS NULL`
/// - allows recovering a stuck unowned running row
pub async fn mark_global_phase2_job_failed_if_unowned(
&self,
ownership_token: &str,
@ -786,7 +897,11 @@ ON CONFLICT(kind, job_key) DO UPDATE SET
ELSE NULL
END,
retry_remaining = max(jobs.retry_remaining, excluded.retry_remaining),
input_watermark = max(COALESCE(jobs.input_watermark, 0), excluded.input_watermark)
input_watermark = CASE
WHEN excluded.input_watermark > COALESCE(jobs.input_watermark, 0)
THEN excluded.input_watermark
ELSE COALESCE(jobs.input_watermark, 0) + 1
END
"#,
)
.bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL)