feat: wait for backfill to be ready (#10790)

This commit is contained in:
jif-oai 2026-02-05 20:45:16 +00:00 committed by GitHub
parent 529b539564
commit 428a9f6035
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 43 additions and 19 deletions

View file

@ -62,6 +62,10 @@ async fn insert_state_db_thread(
codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None)
.await
.expect("state db should initialize");
runtime
.mark_backfill_complete(None)
.await
.expect("backfill should be complete");
let created_at = chrono::Utc
.with_ymd_and_hms(2025, 1, 3, 12, 0, 0)
.single()
@ -280,6 +284,10 @@ async fn find_thread_path_repairs_missing_db_row_after_filesystem_fallback() {
codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None)
.await
.expect("state db should initialize");
_runtime
.mark_backfill_complete(None)
.await
.expect("backfill should be complete");
let found = crate::rollout::find_thread_path_by_id_str(home, &uuid.to_string())
.await

View file

@ -54,30 +54,20 @@ pub(crate) async fn init_if_enabled(
return None;
}
};
let should_backfill = match runtime.get_backfill_state().await {
Ok(state) => state.status != codex_state::BackfillStatus::Complete,
let backfill_state = match runtime.get_backfill_state().await {
Ok(state) => state,
Err(err) => {
warn!(
"failed to read backfill state at {}: {err}",
config.codex_home.display()
);
true
return None;
}
};
if should_backfill {
let runtime_for_backfill = Arc::clone(&runtime);
let config_for_backfill = config.clone();
let otel_for_backfill = otel.cloned();
tokio::task::spawn(async move {
metadata::backfill_sessions(
runtime_for_backfill.as_ref(),
&config_for_backfill,
otel_for_backfill.as_ref(),
)
.await;
});
if backfill_state.status != codex_state::BackfillStatus::Complete {
metadata::backfill_sessions(runtime.as_ref(), config, otel).await;
}
Some(runtime)
require_backfill_complete(runtime, config.codex_home.as_path()).await
}
/// Get the DB if the feature is enabled and the DB exists.
@ -88,13 +78,14 @@ pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option
{
return None;
}
codex_state::StateRuntime::init(
let runtime = codex_state::StateRuntime::init(
config.codex_home.clone(),
config.model_provider_id.clone(),
otel.cloned(),
)
.await
.ok()
.ok()?;
require_backfill_complete(runtime, config.codex_home.as_path()).await
}
/// Open the state runtime when the SQLite file exists, without feature gating.
@ -112,7 +103,31 @@ pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Optio
)
.await
.ok()?;
Some(runtime)
require_backfill_complete(runtime, codex_home).await
}
async fn require_backfill_complete(
runtime: StateDbHandle,
codex_home: &Path,
) -> Option<StateDbHandle> {
match runtime.get_backfill_state().await {
Ok(state) if state.status == codex_state::BackfillStatus::Complete => Some(runtime),
Ok(state) => {
warn!(
"state db backfill not complete at {} (status: {})",
codex_home.display(),
state.status.as_str()
);
None
}
Err(err) => {
warn!(
"failed to read backfill state at {}: {err}",
codex_home.display()
);
None
}
}
}
fn cursor_to_anchor(cursor: Option<&Cursor>) -> Option<codex_state::Anchor> {

View file

@ -59,6 +59,7 @@ async fn upsert_thread_metadata(codex_home: &Path, thread_id: ThreadId, rollout_
let runtime = StateRuntime::init(codex_home.to_path_buf(), "test-provider".to_string(), None)
.await
.unwrap();
runtime.mark_backfill_complete(None).await.unwrap();
let mut builder = ThreadMetadataBuilder::new(
thread_id,
rollout_path,