diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs index ccd476b8c..a4bd72433 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/core/src/rollout/tests.rs @@ -62,6 +62,10 @@ async fn insert_state_db_thread( codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None) .await .expect("state db should initialize"); + runtime + .mark_backfill_complete(None) + .await + .expect("backfill should be complete"); let created_at = chrono::Utc .with_ymd_and_hms(2025, 1, 3, 12, 0, 0) .single() @@ -280,6 +284,10 @@ async fn find_thread_path_repairs_missing_db_row_after_filesystem_fallback() { codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None) .await .expect("state db should initialize"); + _runtime + .mark_backfill_complete(None) + .await + .expect("backfill should be complete"); let found = crate::rollout::find_thread_path_by_id_str(home, &uuid.to_string()) .await diff --git a/codex-rs/core/src/state_db.rs b/codex-rs/core/src/state_db.rs index 97846cdcf..ed2e8c264 100644 --- a/codex-rs/core/src/state_db.rs +++ b/codex-rs/core/src/state_db.rs @@ -54,30 +54,20 @@ pub(crate) async fn init_if_enabled( return None; } }; - let should_backfill = match runtime.get_backfill_state().await { - Ok(state) => state.status != codex_state::BackfillStatus::Complete, + let backfill_state = match runtime.get_backfill_state().await { + Ok(state) => state, Err(err) => { warn!( "failed to read backfill state at {}: {err}", config.codex_home.display() ); - true + return None; } }; - if should_backfill { - let runtime_for_backfill = Arc::clone(&runtime); - let config_for_backfill = config.clone(); - let otel_for_backfill = otel.cloned(); - tokio::task::spawn(async move { - metadata::backfill_sessions( - runtime_for_backfill.as_ref(), - &config_for_backfill, - otel_for_backfill.as_ref(), - ) - .await; - }); + if backfill_state.status != codex_state::BackfillStatus::Complete { + metadata::backfill_sessions(runtime.as_ref(), config, otel).await; } - Some(runtime) + require_backfill_complete(runtime, config.codex_home.as_path()).await } /// Get the DB if the feature is enabled and the DB exists. @@ -88,13 +78,14 @@ pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option { return None; } - codex_state::StateRuntime::init( + let runtime = codex_state::StateRuntime::init( config.codex_home.clone(), config.model_provider_id.clone(), otel.cloned(), ) .await - .ok() + .ok()?; + require_backfill_complete(runtime, config.codex_home.as_path()).await } /// Open the state runtime when the SQLite file exists, without feature gating. @@ -112,7 +103,31 @@ pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Optio ) .await .ok()?; - Some(runtime) + require_backfill_complete(runtime, codex_home).await +} + +async fn require_backfill_complete( + runtime: StateDbHandle, + codex_home: &Path, +) -> Option { + match runtime.get_backfill_state().await { + Ok(state) if state.status == codex_state::BackfillStatus::Complete => Some(runtime), + Ok(state) => { + warn!( + "state db backfill not complete at {} (status: {})", + codex_home.display(), + state.status.as_str() + ); + None + } + Err(err) => { + warn!( + "failed to read backfill state at {}: {err}", + codex_home.display() + ); + None + } + } } fn cursor_to_anchor(cursor: Option<&Cursor>) -> Option { diff --git a/codex-rs/core/tests/suite/rollout_list_find.rs b/codex-rs/core/tests/suite/rollout_list_find.rs index 5b545384b..236a70279 100644 --- a/codex-rs/core/tests/suite/rollout_list_find.rs +++ b/codex-rs/core/tests/suite/rollout_list_find.rs @@ -59,6 +59,7 @@ async fn upsert_thread_metadata(codex_home: &Path, thread_id: ThreadId, rollout_ let runtime = StateRuntime::init(codex_home.to_path_buf(), "test-provider".to_string(), None) .await .unwrap(); + runtime.mark_backfill_complete(None).await.unwrap(); let mut builder = ThreadMetadataBuilder::new( thread_id, rollout_path,