From c76bc8d1ce351405b46459a2711218a5ec10bf2f Mon Sep 17 00:00:00 2001 From: jif-oai Date: Fri, 27 Feb 2026 12:49:03 +0100 Subject: [PATCH] feat: use the memory mode for phase 1 extraction (#13002) --- codex-rs/state/src/runtime/memories.rs | 73 ++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index 5ebb2be88..908919d0a 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -97,6 +97,7 @@ WHERE thread_id = ? /// Query behavior: /// - starts from `threads` filtered to active threads and allowed sources /// (`push_thread_filters`) + /// - excludes threads with `memory_mode != 'enabled'` /// - excludes the current thread id /// - keeps only threads in the age window: /// `updated_at >= now - max_age_days` and `updated_at <= now - min_rollout_idle_hours` @@ -174,6 +175,7 @@ LEFT JOIN jobs SortKey::UpdatedAt, None, ); + builder.push(" AND threads.memory_mode = 'enabled'"); builder .push(" AND id != ") .push_bind(current_thread_id.as_str()); @@ -1591,6 +1593,77 @@ mod tests { let _ = tokio::fs::remove_dir_all(codex_home).await; } + #[tokio::test] + async fn claim_stage1_jobs_skips_threads_with_disabled_memory_mode() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let now = Utc::now(); + let eligible_at = now - Duration::hours(13); + + let current_thread_id = + ThreadId::from_string(&Uuid::new_v4().to_string()).expect("current thread id"); + let disabled_thread_id = + ThreadId::from_string(&Uuid::new_v4().to_string()).expect("disabled thread id"); + let enabled_thread_id = + ThreadId::from_string(&Uuid::new_v4().to_string()).expect("enabled thread id"); + + let mut current = + test_thread_metadata(&codex_home, current_thread_id, codex_home.join("current")); + current.created_at = now; + current.updated_at = now; + runtime + .upsert_thread(¤t) + .await + .expect("upsert current thread"); + + let mut disabled = + test_thread_metadata(&codex_home, disabled_thread_id, codex_home.join("disabled")); + disabled.created_at = eligible_at; + disabled.updated_at = eligible_at; + runtime + .upsert_thread(&disabled) + .await + .expect("upsert disabled thread"); + sqlx::query("UPDATE threads SET memory_mode = 'disabled' WHERE id = ?") + .bind(disabled_thread_id.to_string()) + .execute(runtime.pool.as_ref()) + .await + .expect("disable thread memory mode"); + + let mut enabled = + test_thread_metadata(&codex_home, enabled_thread_id, codex_home.join("enabled")); + enabled.created_at = eligible_at; + enabled.updated_at = eligible_at; + runtime + .upsert_thread(&enabled) + .await + .expect("upsert enabled thread"); + + let allowed_sources = vec!["cli".to_string()]; + let claims = runtime + .claim_stage1_jobs_for_startup( + current_thread_id, + Stage1StartupClaimParams { + scan_limit: 10, + max_claimed: 10, + max_age_days: 30, + min_rollout_idle_hours: 12, + allowed_sources: allowed_sources.as_slice(), + lease_seconds: 3600, + }, + ) + .await + .expect("claim stage1 startup jobs"); + + assert_eq!(claims.len(), 1); + assert_eq!(claims[0].thread.id, enabled_thread_id); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + #[tokio::test] async fn claim_stage1_jobs_enforces_global_running_cap() { let codex_home = unique_temp_dir();