diff --git a/codex-rs/core/src/rollout/metadata.rs b/codex-rs/core/src/rollout/metadata.rs index ec11ca324..b87879f4c 100644 --- a/codex-rs/core/src/rollout/metadata.rs +++ b/codex-rs/core/src/rollout/metadata.rs @@ -13,14 +13,15 @@ use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; +use codex_state::BackfillState; use codex_state::BackfillStats; +use codex_state::BackfillStatus; use codex_state::DB_ERROR_METRIC; use codex_state::DB_METRIC_BACKFILL; use codex_state::DB_METRIC_BACKFILL_DURATION_MS; use codex_state::ExtractionOutcome; use codex_state::ThreadMetadataBuilder; use codex_state::apply_rollout_item; -use std::cmp::Reverse; use std::path::Path; use std::path::PathBuf; use tracing::info; @@ -28,6 +29,7 @@ use tracing::warn; const ROLLOUT_PREFIX: &str = "rollout-"; const ROLLOUT_SUFFIX: &str = ".jsonl"; +const BACKFILL_BATCH_SIZE: usize = 200; pub(crate) fn builder_from_session_meta( session_meta: &SessionMetaLine, @@ -130,16 +132,52 @@ pub(crate) async fn backfill_sessions( otel: Option<&OtelManager>, ) { let timer = otel.and_then(|otel| otel.start_timer(DB_METRIC_BACKFILL_DURATION_MS, &[]).ok()); + let mut backfill_state = match runtime.get_backfill_state().await { + Ok(state) => state, + Err(err) => { + warn!( + "failed to read backfill state at {}: {err}", + config.codex_home.display() + ); + if let Some(otel) = otel { + otel.counter(DB_ERROR_METRIC, 1, &[("stage", "backfill_state_read")]); + } + BackfillState::default() + } + }; + if backfill_state.status == BackfillStatus::Complete { + return; + } + if let Err(err) = runtime.mark_backfill_running().await { + warn!( + "failed to mark backfill running at {}: {err}", + config.codex_home.display() + ); + if let Some(otel) = otel { + otel.counter( + DB_ERROR_METRIC, + 1, + &[("stage", "backfill_state_mark_running")], + ); + } + } else { + backfill_state.status = BackfillStatus::Running; + } + let sessions_root = config.codex_home.join(rollout::SESSIONS_SUBDIR); let archived_root = config.codex_home.join(rollout::ARCHIVED_SESSIONS_SUBDIR); - let mut rollout_paths: Vec<(PathBuf, bool)> = Vec::new(); + let mut rollout_paths: Vec = Vec::new(); for (root, archived) in [(sessions_root, false), (archived_root, true)] { if !tokio::fs::try_exists(&root).await.unwrap_or(false) { continue; } match collect_rollout_paths(&root).await { Ok(paths) => { - rollout_paths.extend(paths.into_iter().map(|path| (path, archived))); + rollout_paths.extend(paths.into_iter().map(|path| BackfillRolloutPath { + watermark: backfill_watermark_for_path(config.codex_home.as_path(), &path), + path, + archived, + })); } Err(err) => { warn!( @@ -149,75 +187,126 @@ pub(crate) async fn backfill_sessions( } } } - rollout_paths.sort_by_key(|(path, _archived)| { - let parsed = path - .file_name() - .and_then(|name| name.to_str()) - .and_then(parse_timestamp_uuid_from_filename) - .unwrap_or((time::OffsetDateTime::UNIX_EPOCH, uuid::Uuid::nil())); - (Reverse(parsed.0), Reverse(parsed.1)) - }); + rollout_paths.sort_by(|a, b| a.watermark.cmp(&b.watermark)); + if let Some(last_watermark) = backfill_state.last_watermark.as_deref() { + rollout_paths.retain(|entry| entry.watermark.as_str() > last_watermark); + } + let mut stats = BackfillStats { scanned: 0, upserted: 0, failed: 0, }; - for (path, archived) in rollout_paths { - stats.scanned = stats.scanned.saturating_add(1); - match extract_metadata_from_rollout(&path, config.model_provider_id.as_str(), otel).await { - Ok(outcome) => { - if outcome.parse_errors > 0 - && let Some(otel) = otel - { - otel.counter( - DB_ERROR_METRIC, - outcome.parse_errors as i64, - &[("stage", "backfill_sessions")], - ); - } - let mut metadata = outcome.metadata; - if archived && metadata.archived_at.is_none() { - let fallback_archived_at = metadata.updated_at; - metadata.archived_at = file_modified_time_utc(&path) - .await - .or(Some(fallback_archived_at)); - } - if let Err(err) = runtime.upsert_thread(&metadata).await { - stats.failed = stats.failed.saturating_add(1); - warn!("failed to upsert rollout {}: {err}", path.display()); - } else { - stats.upserted = stats.upserted.saturating_add(1); - if let Ok(meta_line) = rollout::list::read_session_meta_line(&path).await { - if let Err(err) = runtime - .persist_dynamic_tools( - meta_line.meta.id, - meta_line.meta.dynamic_tools.as_deref(), - ) - .await - { - if let Some(otel) = otel { - otel.counter( - DB_ERROR_METRIC, - 1, - &[("stage", "backfill_dynamic_tools")], - ); - } - warn!("failed to backfill dynamic tools {}: {err}", path.display()); - } - } else { - warn!( - "failed to read session meta for dynamic tools {}", - path.display() + let mut last_watermark = backfill_state.last_watermark.clone(); + for batch in rollout_paths.chunks(BACKFILL_BATCH_SIZE) { + for rollout in batch { + stats.scanned = stats.scanned.saturating_add(1); + match extract_metadata_from_rollout( + &rollout.path, + config.model_provider_id.as_str(), + otel, + ) + .await + { + Ok(outcome) => { + if outcome.parse_errors > 0 + && let Some(otel) = otel + { + otel.counter( + DB_ERROR_METRIC, + outcome.parse_errors as i64, + &[("stage", "backfill_sessions")], ); } + let mut metadata = outcome.metadata; + if rollout.archived && metadata.archived_at.is_none() { + let fallback_archived_at = metadata.updated_at; + metadata.archived_at = file_modified_time_utc(&rollout.path) + .await + .or(Some(fallback_archived_at)); + } + if let Err(err) = runtime.upsert_thread(&metadata).await { + stats.failed = stats.failed.saturating_add(1); + warn!("failed to upsert rollout {}: {err}", rollout.path.display()); + } else { + stats.upserted = stats.upserted.saturating_add(1); + if let Ok(meta_line) = + rollout::list::read_session_meta_line(&rollout.path).await + { + if let Err(err) = runtime + .persist_dynamic_tools( + meta_line.meta.id, + meta_line.meta.dynamic_tools.as_deref(), + ) + .await + { + if let Some(otel) = otel { + otel.counter( + DB_ERROR_METRIC, + 1, + &[("stage", "backfill_dynamic_tools")], + ); + } + warn!( + "failed to backfill dynamic tools {}: {err}", + rollout.path.display() + ); + } + } else { + warn!( + "failed to read session meta for dynamic tools {}", + rollout.path.display() + ); + } + } + } + Err(err) => { + stats.failed = stats.failed.saturating_add(1); + warn!( + "failed to extract rollout {}: {err}", + rollout.path.display() + ); } } - Err(err) => { - stats.failed = stats.failed.saturating_add(1); - warn!("failed to extract rollout {}: {err}", path.display()); + } + + if let Some(last_entry) = batch.last() { + if let Err(err) = runtime + .checkpoint_backfill(last_entry.watermark.as_str()) + .await + { + warn!( + "failed to checkpoint backfill at {}: {err}", + config.codex_home.display() + ); + if let Some(otel) = otel { + otel.counter( + DB_ERROR_METRIC, + 1, + &[("stage", "backfill_state_checkpoint")], + ); + } + } else { + last_watermark = Some(last_entry.watermark.clone()); } } } + if let Err(err) = runtime + .mark_backfill_complete(last_watermark.as_deref()) + .await + { + warn!( + "failed to mark backfill complete at {}: {err}", + config.codex_home.display() + ); + if let Some(otel) = otel { + otel.counter( + DB_ERROR_METRIC, + 1, + &[("stage", "backfill_state_mark_complete")], + ); + } + } info!( "state db backfill scanned={}, upserted={}, failed={}", @@ -247,6 +336,20 @@ pub(crate) async fn backfill_sessions( } } +#[derive(Debug, Clone)] +struct BackfillRolloutPath { + watermark: String, + path: PathBuf, + archived: bool, +} + +fn backfill_watermark_for_path(codex_home: &Path, path: &Path) -> String { + path.strip_prefix(codex_home) + .unwrap_or(path) + .to_string_lossy() + .replace('\\', "/") +} + async fn file_modified_time_utc(path: &Path) -> Option> { let modified = tokio::fs::metadata(path).await.ok()?.modified().ok()?; let updated_at: DateTime = modified.into(); @@ -331,10 +434,13 @@ mod tests { use codex_protocol::protocol::SessionMeta; use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; + use codex_state::BackfillStatus; use codex_state::ThreadMetadataBuilder; use pretty_assertions::assert_eq; use std::fs::File; use std::io::Write; + use std::path::Path; + use std::path::PathBuf; use tempfile::tempdir; use uuid::Uuid; @@ -412,4 +518,108 @@ mod tests { assert_eq!(builder, expected); } + + #[tokio::test] + async fn backfill_sessions_resumes_from_watermark_and_marks_complete() { + let dir = tempdir().expect("tempdir"); + let codex_home = dir.path().to_path_buf(); + let first_uuid = Uuid::new_v4(); + let second_uuid = Uuid::new_v4(); + let first_path = write_rollout_in_sessions( + codex_home.as_path(), + "2026-01-27T12-34-56", + "2026-01-27T12:34:56Z", + first_uuid, + ); + let second_path = write_rollout_in_sessions( + codex_home.as_path(), + "2026-01-27T12-35-56", + "2026-01-27T12:35:56Z", + second_uuid, + ); + + let runtime = + codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + let first_watermark = + backfill_watermark_for_path(codex_home.as_path(), first_path.as_path()); + runtime.mark_backfill_running().await.expect("mark running"); + runtime + .checkpoint_backfill(first_watermark.as_str()) + .await + .expect("checkpoint first watermark"); + + let mut config = crate::config::test_config(); + config.codex_home = codex_home.clone(); + config.model_provider_id = "test-provider".to_string(); + backfill_sessions(runtime.as_ref(), &config, None).await; + + let first_id = ThreadId::from_string(&first_uuid.to_string()).expect("first thread id"); + let second_id = ThreadId::from_string(&second_uuid.to_string()).expect("second thread id"); + assert_eq!( + runtime + .get_thread(first_id) + .await + .expect("get first thread"), + None + ); + assert!( + runtime + .get_thread(second_id) + .await + .expect("get second thread") + .is_some() + ); + + let state = runtime + .get_backfill_state() + .await + .expect("get backfill state"); + assert_eq!(state.status, BackfillStatus::Complete); + assert_eq!( + state.last_watermark, + Some(backfill_watermark_for_path( + codex_home.as_path(), + second_path.as_path() + )) + ); + assert!(state.last_success_at.is_some()); + } + + fn write_rollout_in_sessions( + codex_home: &Path, + filename_ts: &str, + event_ts: &str, + thread_uuid: Uuid, + ) -> PathBuf { + let id = ThreadId::from_string(&thread_uuid.to_string()).expect("thread id"); + let sessions_dir = codex_home.join("sessions"); + std::fs::create_dir_all(sessions_dir.as_path()).expect("create sessions dir"); + let path = sessions_dir.join(format!("rollout-{filename_ts}-{thread_uuid}.jsonl")); + let session_meta = SessionMeta { + id, + forked_from_id: None, + timestamp: event_ts.to_string(), + cwd: codex_home.to_path_buf(), + originator: "cli".to_string(), + cli_version: "0.0.0".to_string(), + source: SessionSource::default(), + model_provider: Some("test-provider".to_string()), + base_instructions: None, + dynamic_tools: None, + }; + let session_meta_line = SessionMetaLine { + meta: session_meta, + git: None, + }; + let rollout_line = RolloutLine { + timestamp: event_ts.to_string(), + item: RolloutItem::SessionMeta(session_meta_line), + }; + let json = serde_json::to_string(&rollout_line).expect("serialize rollout"); + let mut file = File::create(&path).expect("create rollout"); + writeln!(file, "{json}").expect("write rollout"); + path + } } diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index 465bcbbcd..89f8829a2 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -19,6 +19,7 @@ use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::{self}; use tokio::sync::oneshot; use tracing::info; +use tracing::trace; use tracing::warn; use super::ARCHIVED_SESSIONS_SUBDIR; @@ -386,7 +387,7 @@ impl RolloutRecorder { pub(crate) async fn load_rollout_items( path: &Path, ) -> std::io::Result<(Vec, Option, usize)> { - info!("Resuming rollout from {path:?}"); + trace!("Resuming rollout from {path:?}"); let text = tokio::fs::read_to_string(path).await?; if text.trim().is_empty() { return Err(IoError::other("empty session file")); @@ -433,7 +434,7 @@ impl RolloutRecorder { } }, Err(e) => { - warn!("failed to parse rollout line: {e}"); + trace!("failed to parse rollout line: {e}"); parse_errors = parse_errors.saturating_add(1); } } diff --git a/codex-rs/core/src/state_db.rs b/codex-rs/core/src/state_db.rs index c36d8afe4..4ea1435ea 100644 --- a/codex-rs/core/src/state_db.rs +++ b/codex-rs/core/src/state_db.rs @@ -31,11 +31,9 @@ pub(crate) async fn init_if_enabled( config: &Config, otel: Option<&OtelManager>, ) -> Option { - let state_path = codex_state::state_db_path(config.codex_home.as_path()); if !config.features.enabled(Feature::Sqlite) { return None; } - let existed = tokio::fs::try_exists(&state_path).await.unwrap_or(false); let runtime = match codex_state::StateRuntime::init( config.codex_home.clone(), config.model_provider_id.clone(), @@ -55,7 +53,17 @@ pub(crate) async fn init_if_enabled( return None; } }; - if !existed { + let should_backfill = match runtime.get_backfill_state().await { + Ok(state) => state.status != codex_state::BackfillStatus::Complete, + Err(err) => { + warn!( + "failed to read backfill state at {}: {err}", + config.codex_home.display() + ); + true + } + }; + if should_backfill { let runtime_for_backfill = Arc::clone(&runtime); let config_for_backfill = config.clone(); let otel_for_backfill = otel.cloned(); diff --git a/codex-rs/state/migrations/0008_backfill_state.sql b/codex-rs/state/migrations/0008_backfill_state.sql new file mode 100644 index 000000000..c9fc1fdeb --- /dev/null +++ b/codex-rs/state/migrations/0008_backfill_state.sql @@ -0,0 +1,17 @@ +CREATE TABLE backfill_state ( + id INTEGER PRIMARY KEY CHECK (id = 1), + status TEXT NOT NULL, + last_watermark TEXT, + last_success_at INTEGER, + updated_at INTEGER NOT NULL +); + +INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at) +VALUES ( + 1, + 'pending', + NULL, + NULL, + CAST(strftime('%s', 'now') AS INTEGER) +) +ON CONFLICT(id) DO NOTHING; diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index 7df337409..1625554e2 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -22,7 +22,9 @@ pub use runtime::StateRuntime; /// Most consumers should prefer [`StateRuntime`]. pub use extract::apply_rollout_item; pub use model::Anchor; +pub use model::BackfillState; pub use model::BackfillStats; +pub use model::BackfillStatus; pub use model::ExtractionOutcome; pub use model::SortKey; pub use model::ThreadMemory; @@ -36,9 +38,9 @@ pub use runtime::state_db_path; /// Errors encountered during DB operations. Tags: [stage] pub const DB_ERROR_METRIC: &str = "codex.db.error"; -/// Metrics on backfill process during first init of the db. Tags: [status] +/// Metrics on backfill process. Tags: [status] pub const DB_METRIC_BACKFILL: &str = "codex.db.backfill"; -/// Metrics on backfill duration during first init of the db. Tags: [status] +/// Metrics on backfill duration. Tags: [status] pub const DB_METRIC_BACKFILL_DURATION_MS: &str = "codex.db.backfill.duration_ms"; /// Metrics on errors during comparison between DB and rollout file. Tags: [stage] pub const DB_METRIC_COMPARE_ERROR: &str = "codex.db.compare_error"; diff --git a/codex-rs/state/src/model/backfill_state.rs b/codex-rs/state/src/model/backfill_state.rs new file mode 100644 index 000000000..353929f98 --- /dev/null +++ b/codex-rs/state/src/model/backfill_state.rs @@ -0,0 +1,73 @@ +use anyhow::Result; +use chrono::DateTime; +use chrono::Utc; +use sqlx::Row; +use sqlx::sqlite::SqliteRow; + +/// Persisted lifecycle state for rollout metadata backfill. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct BackfillState { + /// Current lifecycle status. + pub status: BackfillStatus, + /// Last processed rollout watermark. + pub last_watermark: Option, + /// Last successful completion time. + pub last_success_at: Option>, +} + +impl Default for BackfillState { + fn default() -> Self { + Self { + status: BackfillStatus::Pending, + last_watermark: None, + last_success_at: None, + } + } +} + +impl BackfillState { + pub(crate) fn try_from_row(row: &SqliteRow) -> Result { + let status: String = row.try_get("status")?; + let last_success_at = row + .try_get::, _>("last_success_at")? + .map(epoch_seconds_to_datetime) + .transpose()?; + Ok(Self { + status: BackfillStatus::parse(status.as_str())?, + last_watermark: row.try_get("last_watermark")?, + last_success_at, + }) + } +} + +/// Backfill lifecycle status. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BackfillStatus { + Pending, + Running, + Complete, +} + +impl BackfillStatus { + pub const fn as_str(self) -> &'static str { + match self { + BackfillStatus::Pending => "pending", + BackfillStatus::Running => "running", + BackfillStatus::Complete => "complete", + } + } + + pub fn parse(value: &str) -> Result { + match value { + "pending" => Ok(Self::Pending), + "running" => Ok(Self::Running), + "complete" => Ok(Self::Complete), + _ => Err(anyhow::anyhow!("invalid backfill status: {value}")), + } + } +} + +fn epoch_seconds_to_datetime(secs: i64) -> Result> { + DateTime::::from_timestamp(secs, 0) + .ok_or_else(|| anyhow::anyhow!("invalid unix timestamp: {secs}")) +} diff --git a/codex-rs/state/src/model/mod.rs b/codex-rs/state/src/model/mod.rs index d937019ec..6bec8875d 100644 --- a/codex-rs/state/src/model/mod.rs +++ b/codex-rs/state/src/model/mod.rs @@ -1,7 +1,10 @@ +mod backfill_state; mod log; mod thread_memory; mod thread_metadata; +pub use backfill_state::BackfillState; +pub use backfill_state::BackfillStatus; pub use log::LogEntry; pub use log::LogQuery; pub use log::LogRow; diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 25c666fa6..ae28a0219 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -91,6 +91,80 @@ impl StateRuntime { self.codex_home.as_path() } + /// Get persisted rollout metadata backfill state. + pub async fn get_backfill_state(&self) -> anyhow::Result { + self.ensure_backfill_state_row().await?; + let row = sqlx::query( + r#" +SELECT status, last_watermark, last_success_at +FROM backfill_state +WHERE id = 1 + "#, + ) + .fetch_one(self.pool.as_ref()) + .await?; + crate::BackfillState::try_from_row(&row) + } + + /// Mark rollout metadata backfill as running. + pub async fn mark_backfill_running(&self) -> anyhow::Result<()> { + self.ensure_backfill_state_row().await?; + sqlx::query( + r#" +UPDATE backfill_state +SET status = ?, updated_at = ? +WHERE id = 1 + "#, + ) + .bind(crate::BackfillStatus::Running.as_str()) + .bind(Utc::now().timestamp()) + .execute(self.pool.as_ref()) + .await?; + Ok(()) + } + + /// Persist rollout metadata backfill progress. + pub async fn checkpoint_backfill(&self, watermark: &str) -> anyhow::Result<()> { + self.ensure_backfill_state_row().await?; + sqlx::query( + r#" +UPDATE backfill_state +SET status = ?, last_watermark = ?, updated_at = ? +WHERE id = 1 + "#, + ) + .bind(crate::BackfillStatus::Running.as_str()) + .bind(watermark) + .bind(Utc::now().timestamp()) + .execute(self.pool.as_ref()) + .await?; + Ok(()) + } + + /// Mark rollout metadata backfill as complete. + pub async fn mark_backfill_complete(&self, last_watermark: Option<&str>) -> anyhow::Result<()> { + self.ensure_backfill_state_row().await?; + let now = Utc::now().timestamp(); + sqlx::query( + r#" +UPDATE backfill_state +SET + status = ?, + last_watermark = COALESCE(?, last_watermark), + last_success_at = ?, + updated_at = ? +WHERE id = 1 + "#, + ) + .bind(crate::BackfillStatus::Complete.as_str()) + .bind(last_watermark) + .bind(now) + .bind(now) + .execute(self.pool.as_ref()) + .await?; + Ok(()) + } + /// Load thread metadata by id using the underlying database. pub async fn get_thread(&self, id: ThreadId) -> anyhow::Result> { let row = sqlx::query( @@ -637,6 +711,22 @@ ON CONFLICT(thread_id, position) DO NOTHING } self.upsert_thread(&metadata).await } + + async fn ensure_backfill_state_row(&self) -> anyhow::Result<()> { + sqlx::query( + r#" +INSERT INTO backfill_state (id, status, last_watermark, last_success_at, updated_at) +VALUES (?, ?, NULL, NULL, ?) +ON CONFLICT(id) DO NOTHING + "#, + ) + .bind(1_i64) + .bind(crate::BackfillStatus::Pending.as_str()) + .bind(Utc::now().timestamp()) + .execute(self.pool.as_ref()) + .await?; + Ok(()) + } } fn push_log_filters<'a>(builder: &mut QueryBuilder<'a, Sqlite>, query: &'a LogQuery) { @@ -889,7 +979,10 @@ mod tests { let nanos = SystemTime::now() .duration_since(UNIX_EPOCH) .map_or(0, |duration| duration.as_nanos()); - std::env::temp_dir().join(format!("codex-state-runtime-test-{nanos}")) + std::env::temp_dir().join(format!( + "codex-state-runtime-test-{nanos}-{}", + Uuid::new_v4() + )) } #[tokio::test] @@ -967,6 +1060,59 @@ mod tests { let _ = tokio::fs::remove_dir_all(codex_home).await; } + #[tokio::test] + async fn backfill_state_persists_progress_and_completion() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let initial = runtime + .get_backfill_state() + .await + .expect("get initial backfill state"); + assert_eq!(initial.status, crate::BackfillStatus::Pending); + assert_eq!(initial.last_watermark, None); + assert_eq!(initial.last_success_at, None); + + runtime + .mark_backfill_running() + .await + .expect("mark backfill running"); + runtime + .checkpoint_backfill("sessions/2026/01/27/rollout-a.jsonl") + .await + .expect("checkpoint backfill"); + + let running = runtime + .get_backfill_state() + .await + .expect("get running backfill state"); + assert_eq!(running.status, crate::BackfillStatus::Running); + assert_eq!( + running.last_watermark, + Some("sessions/2026/01/27/rollout-a.jsonl".to_string()) + ); + assert_eq!(running.last_success_at, None); + + runtime + .mark_backfill_complete(Some("sessions/2026/01/28/rollout-b.jsonl")) + .await + .expect("mark backfill complete"); + let completed = runtime + .get_backfill_state() + .await + .expect("get completed backfill state"); + assert_eq!(completed.status, crate::BackfillStatus::Complete); + assert_eq!( + completed.last_watermark, + Some("sessions/2026/01/28/rollout-b.jsonl".to_string()) + ); + assert!(completed.last_success_at.is_some()); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + #[tokio::test] async fn upsert_and_get_thread_memory() { let codex_home = unique_temp_dir();