diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index d698bd545..b66a81462 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -1432,6 +1432,104 @@ WHERE id = 1 let _ = tokio::fs::remove_dir_all(codex_home).await; } + #[tokio::test] + async fn claim_stage1_jobs_prefilters_threads_with_up_to_date_memory() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let now = Utc::now(); + let eligible_newer_at = now - Duration::hours(13); + let eligible_older_at = now - Duration::hours(14); + + let current_thread_id = + ThreadId::from_string(&Uuid::new_v4().to_string()).expect("current thread id"); + let up_to_date_thread_id = + ThreadId::from_string(&Uuid::new_v4().to_string()).expect("up-to-date thread id"); + let stale_thread_id = + ThreadId::from_string(&Uuid::new_v4().to_string()).expect("stale thread id"); + let worker_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("worker id"); + + let mut current = + test_thread_metadata(&codex_home, current_thread_id, codex_home.join("current")); + current.created_at = now; + current.updated_at = now; + runtime + .upsert_thread(¤t) + .await + .expect("upsert current thread"); + + let mut up_to_date = test_thread_metadata( + &codex_home, + up_to_date_thread_id, + codex_home.join("up-to-date"), + ); + up_to_date.created_at = eligible_newer_at; + up_to_date.updated_at = eligible_newer_at; + runtime + .upsert_thread(&up_to_date) + .await + .expect("upsert up-to-date thread"); + + let up_to_date_claim = runtime + .try_claim_stage1_job( + up_to_date_thread_id, + worker_id, + up_to_date.updated_at.timestamp(), + 3600, + 64, + ) + .await + .expect("claim up-to-date thread for seed"); + let up_to_date_token = match up_to_date_claim { + Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!("unexpected seed claim outcome: {other:?}"), + }; + assert!( + runtime + .mark_stage1_job_succeeded( + up_to_date_thread_id, + up_to_date_token.as_str(), + up_to_date.updated_at.timestamp(), + "raw", + "summary", + ) + .await + .expect("mark up-to-date thread succeeded"), + "seed stage1 success should complete for up-to-date thread" + ); + + let mut stale = + test_thread_metadata(&codex_home, stale_thread_id, codex_home.join("stale")); + stale.created_at = eligible_older_at; + stale.updated_at = eligible_older_at; + runtime + .upsert_thread(&stale) + .await + .expect("upsert stale thread"); + + let allowed_sources = vec!["cli".to_string()]; + let claims = runtime + .claim_stage1_jobs_for_startup( + current_thread_id, + Stage1StartupClaimParams { + scan_limit: 1, + max_claimed: 1, + max_age_days: 30, + min_rollout_idle_hours: 12, + allowed_sources: allowed_sources.as_slice(), + lease_seconds: 3600, + }, + ) + .await + .expect("claim stage1 startup jobs"); + assert_eq!(claims.len(), 1); + assert_eq!(claims[0].thread.id, stale_thread_id); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + #[tokio::test] async fn claim_stage1_jobs_enforces_global_running_cap() { let codex_home = unique_temp_dir(); @@ -1559,6 +1657,92 @@ WHERE kind = 'memory_stage1' let _ = tokio::fs::remove_dir_all(codex_home).await; } + #[tokio::test] + async fn claim_stage1_jobs_processes_two_full_batches_across_startup_passes() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let current_thread_id = + ThreadId::from_string(&Uuid::new_v4().to_string()).expect("current thread id"); + let mut current = + test_thread_metadata(&codex_home, current_thread_id, codex_home.join("current")); + current.created_at = Utc::now(); + current.updated_at = Utc::now(); + runtime + .upsert_thread(¤t) + .await + .expect("upsert current"); + + let eligible_at = Utc::now() - Duration::hours(13); + for idx in 0..200 { + let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id"); + let mut metadata = test_thread_metadata( + &codex_home, + thread_id, + codex_home.join(format!("thread-{idx}")), + ); + metadata.created_at = eligible_at - Duration::seconds(idx as i64); + metadata.updated_at = eligible_at - Duration::seconds(idx as i64); + runtime + .upsert_thread(&metadata) + .await + .expect("upsert eligible thread"); + } + + let allowed_sources = vec!["cli".to_string()]; + let first_claims = runtime + .claim_stage1_jobs_for_startup( + current_thread_id, + Stage1StartupClaimParams { + scan_limit: 5_000, + max_claimed: 64, + max_age_days: 30, + min_rollout_idle_hours: 12, + allowed_sources: allowed_sources.as_slice(), + lease_seconds: 3_600, + }, + ) + .await + .expect("first stage1 startup claim"); + assert_eq!(first_claims.len(), 64); + + for claim in first_claims { + assert!( + runtime + .mark_stage1_job_succeeded( + claim.thread.id, + claim.ownership_token.as_str(), + claim.thread.updated_at.timestamp(), + "raw", + "summary", + ) + .await + .expect("mark first-batch stage1 success"), + "first batch stage1 completion should succeed" + ); + } + + let second_claims = runtime + .claim_stage1_jobs_for_startup( + current_thread_id, + Stage1StartupClaimParams { + scan_limit: 5_000, + max_claimed: 64, + max_age_days: 30, + min_rollout_idle_hours: 12, + allowed_sources: allowed_sources.as_slice(), + lease_seconds: 3_600, + }, + ) + .await + .expect("second stage1 startup claim"); + assert_eq!(second_claims.len(), 64); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + #[tokio::test] async fn stage1_output_cascades_on_thread_delete() { let codex_home = unique_temp_dir(); @@ -1695,6 +1879,88 @@ WHERE kind = 'memory_stage1' let _ = tokio::fs::remove_dir_all(codex_home).await; } + #[tokio::test] + async fn stage1_retry_exhaustion_does_not_block_newer_watermark() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let thread_id = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("thread id"); + let owner = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner id"); + runtime + .upsert_thread(&test_thread_metadata( + &codex_home, + thread_id, + codex_home.join("workspace"), + )) + .await + .expect("upsert thread"); + + for attempt in 0..3 { + let claim = runtime + .try_claim_stage1_job(thread_id, owner, 100, 3_600, 64) + .await + .expect("claim stage1 for retry exhaustion"); + let ownership_token = match claim { + Stage1JobClaimOutcome::Claimed { ownership_token } => ownership_token, + other => panic!( + "attempt {} should claim stage1 before retries are exhausted: {other:?}", + attempt + 1 + ), + }; + assert!( + runtime + .mark_stage1_job_failed(thread_id, ownership_token.as_str(), "boom", 0) + .await + .expect("mark stage1 failed"), + "attempt {} should decrement retry budget", + attempt + 1 + ); + } + + let exhausted_claim = runtime + .try_claim_stage1_job(thread_id, owner, 100, 3_600, 64) + .await + .expect("claim stage1 after retry exhaustion"); + assert_eq!( + exhausted_claim, + Stage1JobClaimOutcome::SkippedRetryExhausted + ); + + let newer_source_claim = runtime + .try_claim_stage1_job(thread_id, owner, 101, 3_600, 64) + .await + .expect("claim stage1 with newer source watermark"); + assert!( + matches!(newer_source_claim, Stage1JobClaimOutcome::Claimed { .. }), + "newer source watermark should reset retry budget and be claimable" + ); + + let job_row = sqlx::query( + "SELECT retry_remaining, input_watermark FROM jobs WHERE kind = ? AND job_key = ?", + ) + .bind("memory_stage1") + .bind(thread_id.to_string()) + .fetch_one(runtime.pool.as_ref()) + .await + .expect("load stage1 job row after newer-source claim"); + assert_eq!( + job_row + .try_get::("retry_remaining") + .expect("retry_remaining"), + 3 + ); + assert_eq!( + job_row + .try_get::("input_watermark") + .expect("input_watermark"), + 101 + ); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + #[tokio::test] async fn phase2_global_consolidation_reruns_when_watermark_advances() { let codex_home = unique_temp_dir(); @@ -2078,6 +2344,65 @@ VALUES (?, ?, ?, ?, ?) let _ = tokio::fs::remove_dir_all(codex_home).await; } + #[tokio::test] + async fn phase2_backfilled_inputs_below_last_success_still_become_dirty() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + runtime + .enqueue_global_consolidation(500) + .await + .expect("enqueue initial consolidation"); + let owner_a = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner a"); + let claim_a = runtime + .try_claim_global_phase2_job(owner_a, 3_600) + .await + .expect("claim initial consolidation"); + let token_a = match claim_a { + Phase2JobClaimOutcome::Claimed { + ownership_token, + input_watermark, + } => { + assert_eq!(input_watermark, 500); + ownership_token + } + other => panic!("unexpected initial phase2 claim outcome: {other:?}"), + }; + assert!( + runtime + .mark_global_phase2_job_succeeded(token_a.as_str(), 500) + .await + .expect("mark initial phase2 success"), + "initial phase2 success should finalize" + ); + + runtime + .enqueue_global_consolidation(400) + .await + .expect("enqueue backfilled consolidation"); + + let owner_b = ThreadId::from_string(&Uuid::new_v4().to_string()).expect("owner b"); + let claim_b = runtime + .try_claim_global_phase2_job(owner_b, 3_600) + .await + .expect("claim backfilled consolidation"); + match claim_b { + Phase2JobClaimOutcome::Claimed { + input_watermark, .. + } => { + assert!( + input_watermark > 500, + "backfilled enqueue should advance dirty watermark beyond last success" + ); + } + other => panic!("unexpected backfilled phase2 claim outcome: {other:?}"), + } + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + #[tokio::test] async fn phase2_failure_fallback_updates_unowned_running_job() { let codex_home = unique_temp_dir(); diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index 162543bda..19f277c82 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -18,6 +18,11 @@ const MEMORY_CONSOLIDATION_JOB_KEY: &str = "global"; const DEFAULT_RETRY_REMAINING: i64 = 3; impl StateRuntime { + /// Deletes all persisted memory state in one transaction. + /// + /// This removes every `stage1_outputs` row and all `jobs` rows for the + /// stage-1 (`memory_stage1`) and phase-2 (`memory_consolidate_global`) + /// memory pipelines. pub async fn clear_memory_data(&self) -> anyhow::Result<()> { let mut tx = self.pool.begin().await?; @@ -44,6 +49,22 @@ WHERE kind = ? OR kind = ? Ok(()) } + /// Selects and claims stage-1 startup jobs for stale threads. + /// + /// Query behavior: + /// - starts from `threads` filtered to active threads and allowed sources + /// (`push_thread_filters`) + /// - excludes the current thread id + /// - keeps only threads in the age window: + /// `updated_at >= now - max_age_days` and `updated_at <= now - min_rollout_idle_hours` + /// - keeps only threads whose memory is stale: + /// `COALESCE(stage1_outputs.source_updated_at, -1) < threads.updated_at` and + /// `COALESCE(jobs.last_success_watermark, -1) < threads.updated_at` + /// - orders by `updated_at DESC, id DESC` and applies `scan_limit` + /// + /// For each selected thread, this function calls [`Self::try_claim_stage1_job`] + /// with `source_updated_at = thread.updated_at.timestamp()` and returns up to + /// `max_claimed` successful claims. pub async fn claim_stage1_jobs_for_startup( &self, current_thread_id: ThreadId, @@ -87,6 +108,16 @@ SELECT git_branch, git_origin_url FROM threads +LEFT JOIN stage1_outputs + ON stage1_outputs.thread_id = threads.id +LEFT JOIN jobs + ON jobs.kind = + "#, + ); + builder.push_bind(JOB_KIND_MEMORY_STAGE1); + builder.push( + r#" + AND jobs.job_key = threads.id "#, ); push_thread_filters( @@ -104,6 +135,8 @@ FROM threads .push(" AND updated_at >= ") .push_bind(max_age_cutoff); builder.push(" AND updated_at <= ").push_bind(idle_cutoff); + builder.push(" AND COALESCE(stage1_outputs.source_updated_at, -1) < updated_at"); + builder.push(" AND COALESCE(jobs.last_success_watermark, -1) < updated_at"); push_thread_order_and_limit(&mut builder, SortKey::UpdatedAt, scan_limit); let items = builder @@ -141,25 +174,12 @@ FROM threads Ok(claimed) } - pub async fn get_stage1_output( - &self, - thread_id: ThreadId, - ) -> anyhow::Result> { - let row = sqlx::query( - r#" -SELECT thread_id, source_updated_at, raw_memory, rollout_summary, generated_at -FROM stage1_outputs -WHERE thread_id = ? - "#, - ) - .bind(thread_id.to_string()) - .fetch_optional(self.pool.as_ref()) - .await?; - - row.map(|row| Stage1OutputRow::try_from_row(&row).and_then(Stage1Output::try_from)) - .transpose() - } - + /// Lists the most recent non-empty stage-1 outputs for global consolidation. + /// + /// Query behavior: + /// - filters out rows where both `raw_memory` and `rollout_summary` are blank + /// - orders by `source_updated_at DESC, thread_id DESC` + /// - applies `LIMIT n` pub async fn list_stage1_outputs_for_global( &self, n: usize, @@ -186,6 +206,22 @@ LIMIT ? .collect::, _>>() } + /// Attempts to claim a stage-1 job for a thread at `source_updated_at`. + /// + /// Claim semantics: + /// - skips as up-to-date when either: + /// - `stage1_outputs.source_updated_at >= source_updated_at`, or + /// - `jobs.last_success_watermark >= source_updated_at` + /// - inserts or updates a `jobs` row to `running` only when: + /// - global running job count for `memory_stage1` is below `max_running_jobs` + /// - existing row is not actively running with a valid lease + /// - retry backoff (if present) has elapsed, or `source_updated_at` advanced + /// - retries remain, or `source_updated_at` advanced (which resets retries) + /// + /// The update path refreshes ownership token, lease, and `input_watermark`. + /// If claiming fails, a follow-up read maps current row state to a precise + /// skip outcome (`SkippedRunning`, `SkippedRetryBackoff`, or + /// `SkippedRetryExhausted`). pub async fn try_claim_stage1_job( &self, thread_id: ThreadId, @@ -274,12 +310,23 @@ ON CONFLICT(kind, job_key) DO UPDATE SET finished_at = NULL, lease_until = excluded.lease_until, retry_at = NULL, + retry_remaining = CASE + WHEN excluded.input_watermark > COALESCE(jobs.input_watermark, -1) THEN ? + ELSE jobs.retry_remaining + END, last_error = NULL, input_watermark = excluded.input_watermark WHERE (jobs.status != 'running' OR jobs.lease_until IS NULL OR jobs.lease_until <= excluded.started_at) - AND (jobs.retry_at IS NULL OR jobs.retry_at <= excluded.started_at) - AND jobs.retry_remaining > 0 + AND ( + jobs.retry_at IS NULL + OR jobs.retry_at <= excluded.started_at + OR excluded.input_watermark > COALESCE(jobs.input_watermark, -1) + ) + AND ( + jobs.retry_remaining > 0 + OR excluded.input_watermark > COALESCE(jobs.input_watermark, -1) + ) AND ( SELECT COUNT(*) FROM jobs AS running_jobs @@ -302,6 +349,7 @@ WHERE .bind(JOB_KIND_MEMORY_STAGE1) .bind(now) .bind(max_running_jobs) + .bind(DEFAULT_RETRY_REMAINING) .bind(max_running_jobs) .execute(&mut *tx) .await? @@ -348,6 +396,15 @@ WHERE kind = ? AND job_key = ? Ok(Stage1JobClaimOutcome::SkippedRunning) } + /// Marks a claimed stage-1 job successful and upserts generated output. + /// + /// Transaction behavior: + /// - updates `jobs` only for the currently owned running row + /// - sets `status='done'` and `last_success_watermark = input_watermark` + /// - upserts `stage1_outputs` for the thread, replacing existing output only + /// when `source_updated_at` is newer or equal + /// - enqueues/advances the global phase-2 job watermark using + /// `source_updated_at` pub async fn mark_stage1_job_succeeded( &self, thread_id: ThreadId, @@ -417,6 +474,14 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at Ok(true) } + /// Marks a claimed stage-1 job successful when extraction produced no output. + /// + /// Transaction behavior: + /// - updates `jobs` only for the currently owned running row + /// - sets `status='done'` and `last_success_watermark = input_watermark` + /// - deletes any existing `stage1_outputs` row for the thread + /// - enqueues/advances the global phase-2 job watermark using the claimed + /// `input_watermark` pub async fn mark_stage1_job_succeeded_no_output( &self, thread_id: ThreadId, @@ -482,6 +547,13 @@ WHERE thread_id = ? Ok(true) } + /// Marks a claimed stage-1 job as failed and schedules retry backoff. + /// + /// Query behavior: + /// - updates only the owned running row for `(kind='memory_stage1', job_key)` + /// - sets `status='error'`, clears lease, writes `last_error` + /// - decrements `retry_remaining` + /// - sets `retry_at = now + retry_delay_seconds` pub async fn mark_stage1_job_failed( &self, thread_id: ThreadId, @@ -520,11 +592,25 @@ WHERE kind = ? AND job_key = ? Ok(rows_affected > 0) } + /// Enqueues or advances the global phase-2 consolidation job watermark. + /// + /// The underlying upsert keeps the job `running` when already running, resets + /// `pending/error` jobs to `pending`, and advances `input_watermark` so each + /// enqueue marks new consolidation work even when `source_updated_at` is + /// older than prior maxima. pub async fn enqueue_global_consolidation(&self, input_watermark: i64) -> anyhow::Result<()> { enqueue_global_consolidation_with_executor(self.pool.as_ref(), input_watermark).await } - /// Try to claim the global phase-2 consolidation job. + /// Attempts to claim the global phase-2 consolidation job. + /// + /// Claim semantics: + /// - reads the singleton global job row (`kind='memory_consolidate_global'`) + /// - returns `SkippedNotDirty` when `input_watermark <= last_success_watermark` + /// - returns `SkippedNotDirty` when retries are exhausted or retry backoff is active + /// - returns `SkippedRunning` when an active running lease exists + /// - otherwise updates the row to `running`, sets ownership + lease, and + /// returns `Claimed` pub async fn try_claim_global_phase2_job( &self, worker_id: ThreadId, @@ -623,6 +709,11 @@ WHERE kind = ? AND job_key = ? } } + /// Extends the lease for an owned running phase-2 global job. + /// + /// Query behavior: + /// - `UPDATE jobs SET lease_until = ?` for the singleton global row + /// - requires `status='running'` and matching `ownership_token` pub async fn heartbeat_global_phase2_job( &self, ownership_token: &str, @@ -649,6 +740,13 @@ WHERE kind = ? AND job_key = ? Ok(rows_affected > 0) } + /// Marks the owned running global phase-2 job as succeeded. + /// + /// Query behavior: + /// - updates only the owned running singleton global row + /// - sets `status='done'`, clears lease/errors + /// - advances `last_success_watermark` to + /// `max(existing_last_success_watermark, completed_watermark)` pub async fn mark_global_phase2_job_succeeded( &self, ownership_token: &str, @@ -680,6 +778,13 @@ WHERE kind = ? AND job_key = ? Ok(rows_affected > 0) } + /// Marks the owned running global phase-2 job as failed and schedules retry. + /// + /// Query behavior: + /// - updates only the owned running singleton global row + /// - sets `status='error'`, clears lease + /// - writes failure reason and retry time + /// - decrements `retry_remaining` pub async fn mark_global_phase2_job_failed( &self, ownership_token: &str, @@ -715,6 +820,12 @@ WHERE kind = ? AND job_key = ? Ok(rows_affected > 0) } + /// Fallback failure finalization when ownership may have been lost. + /// + /// Query behavior: + /// - same state transition as [`Self::mark_global_phase2_job_failed`] + /// - matches rows where `ownership_token = ? OR ownership_token IS NULL` + /// - allows recovering a stuck unowned running row pub async fn mark_global_phase2_job_failed_if_unowned( &self, ownership_token: &str, @@ -786,7 +897,11 @@ ON CONFLICT(kind, job_key) DO UPDATE SET ELSE NULL END, retry_remaining = max(jobs.retry_remaining, excluded.retry_remaining), - input_watermark = max(COALESCE(jobs.input_watermark, 0), excluded.input_watermark) + input_watermark = CASE + WHEN excluded.input_watermark > COALESCE(jobs.input_watermark, 0) + THEN excluded.input_watermark + ELSE COALESCE(jobs.input_watermark, 0) + 1 + END "#, ) .bind(JOB_KIND_MEMORY_CONSOLIDATE_GLOBAL)