feat: use the memory mode for phase 1 extraction (#13002)

This commit is contained in:
jif-oai 2026-02-27 12:49:03 +01:00 committed by GitHub
parent bbd237348d
commit c76bc8d1ce
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -97,6 +97,7 @@ WHERE thread_id = ?
/// Query behavior:
/// - starts from `threads` filtered to active threads and allowed sources
/// (`push_thread_filters`)
/// - excludes threads with `memory_mode != 'enabled'`
/// - excludes the current thread id
/// - keeps only threads in the age window:
/// `updated_at >= now - max_age_days` and `updated_at <= now - min_rollout_idle_hours`
@ -174,6 +175,7 @@ LEFT JOIN jobs
SortKey::UpdatedAt,
None,
);
builder.push(" AND threads.memory_mode = 'enabled'");
builder
.push(" AND id != ")
.push_bind(current_thread_id.as_str());
@ -1591,6 +1593,77 @@ mod tests {
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn claim_stage1_jobs_skips_threads_with_disabled_memory_mode() {
let codex_home = unique_temp_dir();
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
.await
.expect("initialize runtime");
let now = Utc::now();
let eligible_at = now - Duration::hours(13);
let current_thread_id =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("current thread id");
let disabled_thread_id =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("disabled thread id");
let enabled_thread_id =
ThreadId::from_string(&Uuid::new_v4().to_string()).expect("enabled thread id");
let mut current =
test_thread_metadata(&codex_home, current_thread_id, codex_home.join("current"));
current.created_at = now;
current.updated_at = now;
runtime
.upsert_thread(&current)
.await
.expect("upsert current thread");
let mut disabled =
test_thread_metadata(&codex_home, disabled_thread_id, codex_home.join("disabled"));
disabled.created_at = eligible_at;
disabled.updated_at = eligible_at;
runtime
.upsert_thread(&disabled)
.await
.expect("upsert disabled thread");
sqlx::query("UPDATE threads SET memory_mode = 'disabled' WHERE id = ?")
.bind(disabled_thread_id.to_string())
.execute(runtime.pool.as_ref())
.await
.expect("disable thread memory mode");
let mut enabled =
test_thread_metadata(&codex_home, enabled_thread_id, codex_home.join("enabled"));
enabled.created_at = eligible_at;
enabled.updated_at = eligible_at;
runtime
.upsert_thread(&enabled)
.await
.expect("upsert enabled thread");
let allowed_sources = vec!["cli".to_string()];
let claims = runtime
.claim_stage1_jobs_for_startup(
current_thread_id,
Stage1StartupClaimParams {
scan_limit: 10,
max_claimed: 10,
max_age_days: 30,
min_rollout_idle_hours: 12,
allowed_sources: allowed_sources.as_slice(),
lease_seconds: 3600,
},
)
.await
.expect("claim stage1 startup jobs");
assert_eq!(claims.len(), 1);
assert_eq!(claims[0].thread.id, enabled_thread_id);
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn claim_stage1_jobs_enforces_global_running_cap() {
let codex_home = unique_temp_dir();