diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 487f3e899..7fc34083f 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2395,7 +2395,6 @@ dependencies = [ "anyhow", "chrono", "clap", - "codex-otel", "codex-protocol", "dirs", "log", diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index bebce0251..b2d5c0a75 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -2205,7 +2205,7 @@ impl CodexMessageProcessor { let loaded_thread = self.thread_manager.get_thread(thread_uuid).await.ok(); let mut state_db_ctx = loaded_thread.as_ref().and_then(|thread| thread.state_db()); if state_db_ctx.is_none() { - state_db_ctx = get_state_db(&self.config, None).await; + state_db_ctx = get_state_db(&self.config).await; } let Some(state_db_ctx) = state_db_ctx else { self.send_internal_error( @@ -2506,7 +2506,7 @@ impl CodexMessageProcessor { let rollout_path_display = archived_path.display().to_string(); let fallback_provider = self.config.model_provider_id.clone(); - let state_db_ctx = get_state_db(&self.config, None).await; + let state_db_ctx = get_state_db(&self.config).await; let archived_folder = self .config .codex_home @@ -3941,7 +3941,7 @@ impl CodexMessageProcessor { let fallback_provider = self.config.model_provider_id.clone(); let (allowed_sources_vec, source_kind_filter) = compute_source_filters(source_kinds); let allowed_sources = allowed_sources_vec.as_slice(); - let state_db_ctx = get_state_db(&self.config, None).await; + let state_db_ctx = get_state_db(&self.config).await; while remaining > 0 { let page_size = remaining.min(THREAD_LIST_MAX_LIMIT); @@ -4797,7 +4797,7 @@ impl CodexMessageProcessor { self.finalize_thread_teardown(thread_id).await; if state_db_ctx.is_none() { - state_db_ctx = get_state_db(&self.config, None).await; + state_db_ctx = get_state_db(&self.config).await; } // Move the rollout file to archived. @@ -6557,7 +6557,7 @@ impl CodexMessageProcessor { if let Some(log_db) = self.log_db.as_ref() { log_db.flush().await; } - let state_db_ctx = get_state_db(&self.config, None).await; + let state_db_ctx = get_state_db(&self.config).await; match (state_db_ctx.as_ref(), conversation_id) { (Some(state_db_ctx), Some(conversation_id)) => { let thread_id_text = conversation_id.to_string(); @@ -7201,7 +7201,7 @@ async fn read_history_cwd_from_state_db( thread_id: Option, rollout_path: &Path, ) -> Option { - if let Some(state_db_ctx) = get_state_db(config, None).await + if let Some(state_db_ctx) = get_state_db(config).await && let Some(thread_id) = thread_id && let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await { @@ -7222,7 +7222,7 @@ async fn read_summary_from_state_db_by_thread_id( config: &Config, thread_id: ThreadId, ) -> Option { - let state_db_ctx = get_state_db(config, None).await; + let state_db_ctx = get_state_db(config).await; read_summary_from_state_db_context_by_thread_id(state_db_ctx.as_ref(), thread_id).await } diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index b08bbbb98..34796c575 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -502,7 +502,6 @@ pub async fn run_main_with_transport( let log_db = codex_state::StateRuntime::init( config.sqlite_home.clone(), config.model_provider_id.clone(), - None, ) .await .ok() diff --git a/codex-rs/app-server/tests/suite/v2/thread_list.rs b/codex-rs/app-server/tests/suite/v2/thread_list.rs index 96aa2ec76..79031ddd3 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_list.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_list.rs @@ -556,12 +556,9 @@ sqlite = true // `thread/list` only applies `search_term` on the sqlite path. In this test we // create rollouts manually, so we must also create the sqlite DB and mark backfill // complete; otherwise app-server will permanently use filesystem fallback. - let state_db = codex_state::StateRuntime::init( - codex_home.path().to_path_buf(), - "mock_provider".into(), - None, - ) - .await?; + let state_db = + codex_state::StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()) + .await?; state_db.mark_backfill_complete(None).await?; let mut mcp = init_mcp(codex_home.path()).await?; diff --git a/codex-rs/app-server/tests/suite/v2/thread_metadata_update.rs b/codex-rs/app-server/tests/suite/v2/thread_metadata_update.rs index 50917ee8a..6024fe47f 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_metadata_update.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_metadata_update.rs @@ -428,8 +428,7 @@ async fn thread_metadata_update_can_clear_stored_git_fields() -> Result<()> { } async fn init_state_db(codex_home: &Path) -> Result> { - let state_db = - StateRuntime::init(codex_home.to_path_buf(), "mock_provider".into(), None).await?; + let state_db = StateRuntime::init(codex_home.to_path_buf(), "mock_provider".into()).await?; state_db.mark_backfill_complete(None).await?; Ok(state_db) } diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index ca85a0ca4..1812230e9 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -321,12 +321,8 @@ stream_max_retries = 0 .join("\n") + "\n", )?; - let state_db = StateRuntime::init( - codex_home.path().to_path_buf(), - "mock_provider".into(), - None, - ) - .await?; + let state_db = + StateRuntime::init(codex_home.path().to_path_buf(), "mock_provider".into()).await?; state_db.mark_backfill_complete(None).await?; let mut mcp = McpProcess::new(codex_home.path()).await?; diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index 41b836186..d977979b8 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -904,12 +904,9 @@ async fn run_debug_clear_memories_command( let state_path = state_db_path(config.sqlite_home.as_path()); let mut cleared_state_db = false; if tokio::fs::try_exists(&state_path).await? { - let state_db = StateRuntime::init( - config.sqlite_home.clone(), - config.model_provider_id.clone(), - None, - ) - .await?; + let state_db = + StateRuntime::init(config.sqlite_home.clone(), config.model_provider_id.clone()) + .await?; state_db.reset_memory_data_for_fresh_start().await?; cleared_state_db = true; } diff --git a/codex-rs/cli/tests/debug_clear_memories.rs b/codex-rs/cli/tests/debug_clear_memories.rs index d8db1ebbc..c68172ba3 100644 --- a/codex-rs/cli/tests/debug_clear_memories.rs +++ b/codex-rs/cli/tests/debug_clear_memories.rs @@ -16,12 +16,8 @@ fn codex_command(codex_home: &Path) -> Result { #[tokio::test] async fn debug_clear_memories_resets_state_and_removes_memory_dir() -> Result<()> { let codex_home = TempDir::new()?; - let runtime = StateRuntime::init( - codex_home.path().to_path_buf(), - "test-provider".to_string(), - None, - ) - .await?; + let runtime = + StateRuntime::init(codex_home.path().to_path_buf(), "test-provider".to_string()).await?; drop(runtime); let thread_id = "00000000-0000-0000-0000-000000000123"; diff --git a/codex-rs/core/src/agent/control.rs b/codex-rs/core/src/agent/control.rs index e7547f9c9..38f2cf73c 100644 --- a/codex-rs/core/src/agent/control.rs +++ b/codex-rs/core/src/agent/control.rs @@ -236,7 +236,7 @@ impl AgentControl { // Collab resume callers rebuild a placeholder ThreadSpawn source. Rehydrate the // stored nickname/role from sqlite when available; otherwise leave both unset. let (resumed_agent_nickname, resumed_agent_role) = - if let Some(state_db_ctx) = state_db::get_state_db(&config, None).await { + if let Some(state_db_ctx) = state_db::get_state_db(&config).await { match state_db_ctx.get_thread(thread_id).await { Ok(Some(metadata)) => (metadata.agent_nickname, metadata.agent_role), Ok(None) | Err(_) => (None, None), diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 87240d398..6354745e9 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -451,7 +451,7 @@ impl Codex { }; match thread_id { Some(thread_id) => { - let state_db_ctx = state_db::get_state_db(&config, None).await; + let state_db_ctx = state_db::get_state_db(&config).await; state_db::get_dynamic_tools(state_db_ctx.as_deref(), thread_id, "codex_spawn") .await } @@ -1273,7 +1273,7 @@ impl Session { if config.ephemeral { Ok::<_, anyhow::Error>((None, None)) } else { - let state_db_ctx = state_db::init(&config, None).await; + let state_db_ctx = state_db::init(&config).await; let rollout_recorder = RolloutRecorder::new( &config, rollout_params, diff --git a/codex-rs/core/src/memories/tests.rs b/codex-rs/core/src/memories/tests.rs index 1c0fa7b29..8914beaac 100644 --- a/codex-rs/core/src/memories/tests.rs +++ b/codex-rs/core/src/memories/tests.rs @@ -474,7 +474,6 @@ mod phase2 { let state_db = codex_state::StateRuntime::init( config.codex_home.clone(), config.model_provider_id.clone(), - None, ) .await .expect("initialize state db"); @@ -857,7 +856,6 @@ mod phase2 { let state_db = codex_state::StateRuntime::init( config.codex_home.clone(), config.model_provider_id.clone(), - None, ) .await .expect("initialize state db"); diff --git a/codex-rs/core/src/rollout/metadata.rs b/codex-rs/core/src/rollout/metadata.rs index 3d46df0f8..3b18520ee 100644 --- a/codex-rs/core/src/rollout/metadata.rs +++ b/codex-rs/core/src/rollout/metadata.rs @@ -7,7 +7,6 @@ use chrono::DateTime; use chrono::NaiveDateTime; use chrono::Timelike; use chrono::Utc; -use codex_otel::SessionTelemetry; use codex_protocol::ThreadId; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::RolloutItem; @@ -96,7 +95,6 @@ pub(crate) fn builder_from_items( pub(crate) async fn extract_metadata_from_rollout( rollout_path: &Path, default_provider: &str, - otel: Option<&SessionTelemetry>, ) -> anyhow::Result { let (items, _thread_id, parse_errors) = RolloutRecorder::load_rollout_items(rollout_path).await?; @@ -119,15 +117,6 @@ pub(crate) async fn extract_metadata_from_rollout( if let Some(updated_at) = file_modified_time_utc(rollout_path).await { metadata.updated_at = updated_at; } - if parse_errors > 0 - && let Some(otel) = otel - { - otel.counter( - DB_ERROR_METRIC, - parse_errors as i64, - &[("stage", "extract_metadata_from_rollout")], - ); - } Ok(ExtractionOutcome { metadata, memory_mode: items.iter().rev().find_map(|item| match item { @@ -141,12 +130,11 @@ pub(crate) async fn extract_metadata_from_rollout( }) } -pub(crate) async fn backfill_sessions( - runtime: &codex_state::StateRuntime, - config: &Config, - otel: Option<&SessionTelemetry>, -) { - let timer = otel.and_then(|otel| otel.start_timer(DB_METRIC_BACKFILL_DURATION_MS, &[]).ok()); +pub(crate) async fn backfill_sessions(runtime: &codex_state::StateRuntime, config: &Config) { + let metric_client = codex_otel::metrics::global(); + let timer = metric_client + .as_ref() + .and_then(|otel| otel.start_timer(DB_METRIC_BACKFILL_DURATION_MS, &[]).ok()); let backfill_state = match runtime.get_backfill_state().await { Ok(state) => state, Err(err) => { @@ -154,9 +142,6 @@ pub(crate) async fn backfill_sessions( "failed to read backfill state at {}: {err}", config.codex_home.display() ); - if let Some(otel) = otel { - otel.counter(DB_ERROR_METRIC, 1, &[("stage", "backfill_state_read")]); - } BackfillState::default() } }; @@ -170,13 +155,6 @@ pub(crate) async fn backfill_sessions( "failed to claim backfill worker at {}: {err}", config.codex_home.display() ); - if let Some(otel) = otel { - otel.counter( - DB_ERROR_METRIC, - 1, - &[("stage", "backfill_state_claim_running")], - ); - } return; } }; @@ -194,13 +172,6 @@ pub(crate) async fn backfill_sessions( "failed to read claimed backfill state at {}: {err}", config.codex_home.display() ); - if let Some(otel) = otel { - otel.counter( - DB_ERROR_METRIC, - 1, - &[("stage", "backfill_state_read_claimed")], - ); - } BackfillState { status: BackfillStatus::Running, ..Default::default() @@ -213,13 +184,6 @@ pub(crate) async fn backfill_sessions( "failed to mark backfill running at {}: {err}", config.codex_home.display() ); - if let Some(otel) = otel { - otel.counter( - DB_ERROR_METRIC, - 1, - &[("stage", "backfill_state_mark_running")], - ); - } } else { backfill_state.status = BackfillStatus::Running; } @@ -262,18 +226,14 @@ pub(crate) async fn backfill_sessions( for batch in rollout_paths.chunks(BACKFILL_BATCH_SIZE) { for rollout in batch { stats.scanned = stats.scanned.saturating_add(1); - match extract_metadata_from_rollout( - &rollout.path, - config.model_provider_id.as_str(), - otel, - ) - .await + match extract_metadata_from_rollout(&rollout.path, config.model_provider_id.as_str()) + .await { Ok(outcome) => { if outcome.parse_errors > 0 - && let Some(otel) = otel + && let Some(ref metric_client) = metric_client { - otel.counter( + let _ = metric_client.counter( DB_ERROR_METRIC, outcome.parse_errors as i64, &[("stage", "backfill_sessions")], @@ -317,13 +277,6 @@ pub(crate) async fn backfill_sessions( ) .await { - if let Some(otel) = otel { - otel.counter( - DB_ERROR_METRIC, - 1, - &[("stage", "backfill_dynamic_tools")], - ); - } warn!( "failed to backfill dynamic tools {}: {err}", rollout.path.display() @@ -356,13 +309,6 @@ pub(crate) async fn backfill_sessions( "failed to checkpoint backfill at {}: {err}", config.codex_home.display() ); - if let Some(otel) = otel { - otel.counter( - DB_ERROR_METRIC, - 1, - &[("stage", "backfill_state_checkpoint")], - ); - } } else { last_watermark = Some(last_entry.watermark.clone()); } @@ -376,26 +322,19 @@ pub(crate) async fn backfill_sessions( "failed to mark backfill complete at {}: {err}", config.codex_home.display() ); - if let Some(otel) = otel { - otel.counter( - DB_ERROR_METRIC, - 1, - &[("stage", "backfill_state_mark_complete")], - ); - } } info!( "state db backfill scanned={}, upserted={}, failed={}", stats.scanned, stats.upserted, stats.failed ); - if let Some(otel) = otel { - otel.counter( + if let Some(metric_client) = metric_client { + let _ = metric_client.counter( DB_METRIC_BACKFILL, stats.upserted as i64, &[("status", "upserted")], ); - otel.counter( + let _ = metric_client.counter( DB_METRIC_BACKFILL, stats.failed as i64, &[("status", "failed")], @@ -558,7 +497,7 @@ mod tests { let mut file = File::create(&path).expect("create rollout"); writeln!(file, "{json}").expect("write rollout"); - let outcome = extract_metadata_from_rollout(&path, "openai", None) + let outcome = extract_metadata_from_rollout(&path, "openai") .await .expect("extract"); @@ -627,7 +566,7 @@ mod tests { .expect("write rollout line"); } - let outcome = extract_metadata_from_rollout(&path, "openai", None) + let outcome = extract_metadata_from_rollout(&path, "openai") .await .expect("extract"); @@ -684,7 +623,7 @@ mod tests { ); let runtime = - codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); let first_watermark = @@ -702,7 +641,7 @@ mod tests { let mut config = crate::config::test_config(); config.codex_home = codex_home.clone(); config.model_provider_id = "test-provider".to_string(); - backfill_sessions(runtime.as_ref(), &config, None).await; + backfill_sessions(runtime.as_ref(), &config).await; let first_id = ThreadId::from_string(&first_uuid.to_string()).expect("first thread id"); let second_id = ThreadId::from_string(&second_uuid.to_string()).expect("second thread id"); @@ -754,11 +693,11 @@ mod tests { ); let runtime = - codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); let thread_id = ThreadId::from_string(&thread_uuid.to_string()).expect("thread id"); - let mut existing = extract_metadata_from_rollout(&rollout_path, "test-provider", None) + let mut existing = extract_metadata_from_rollout(&rollout_path, "test-provider") .await .expect("extract") .metadata; @@ -773,7 +712,7 @@ mod tests { let mut config = crate::config::test_config(); config.codex_home = codex_home.clone(); config.model_provider_id = "test-provider".to_string(); - backfill_sessions(runtime.as_ref(), &config, None).await; + backfill_sessions(runtime.as_ref(), &config).await; let persisted = runtime .get_thread(thread_id) @@ -804,14 +743,14 @@ mod tests { ); let runtime = - codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + codex_state::StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); let mut config = crate::config::test_config(); config.codex_home = codex_home.clone(); config.model_provider_id = "test-provider".to_string(); - backfill_sessions(runtime.as_ref(), &config, None).await; + backfill_sessions(runtime.as_ref(), &config).await; let thread_id = ThreadId::from_string(&thread_uuid.to_string()).expect("thread id"); let stored = runtime diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index df150bf2b..851dcfb72 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -256,7 +256,7 @@ impl RolloutRecorder { .await? }; - let state_db_ctx = state_db::get_state_db(config, None).await; + let state_db_ctx = state_db::get_state_db(config).await; if state_db_ctx.is_none() { // Keep legacy behavior when SQLite is unavailable: return filesystem results // at the requested page size. @@ -308,7 +308,7 @@ impl RolloutRecorder { filter_cwd: Option<&Path>, ) -> std::io::Result> { let codex_home = config.codex_home.as_path(); - let state_db_ctx = state_db::get_state_db(config, None).await; + let state_db_ctx = state_db::get_state_db(config).await; if state_db_ctx.is_some() { let mut db_cursor = cursor.cloned(); loop { @@ -1061,7 +1061,7 @@ async fn resume_candidate_matches_cwd( return cwd_matches(latest_turn_context_cwd, cwd); } - metadata::extract_metadata_from_rollout(rollout_path, default_provider, None) + metadata::extract_metadata_from_rollout(rollout_path, default_provider) .await .is_ok_and(|outcome| cwd_matches(outcome.metadata.cwd.as_path(), cwd)) } @@ -1254,13 +1254,10 @@ mod tests { .enable(Feature::Sqlite) .expect("test config should allow sqlite"); - let state_db = codex_state::StateRuntime::init( - home.path().to_path_buf(), - config.model_provider_id.clone(), - None, - ) - .await - .expect("state db should initialize"); + let state_db = + StateRuntime::init(home.path().to_path_buf(), config.model_provider_id.clone()) + .await + .expect("state db should initialize"); state_db .mark_backfill_complete(None) .await @@ -1345,13 +1342,10 @@ mod tests { .enable(Feature::Sqlite) .expect("test config should allow sqlite"); - let state_db = codex_state::StateRuntime::init( - home.path().to_path_buf(), - config.model_provider_id.clone(), - None, - ) - .await - .expect("state db should initialize"); + let state_db = + StateRuntime::init(home.path().to_path_buf(), config.model_provider_id.clone()) + .await + .expect("state db should initialize"); let thread_id = ThreadId::new(); let rollout_path = home.path().join("rollout.jsonl"); let builder = ThreadMetadataBuilder::new( @@ -1457,7 +1451,6 @@ mod tests { let runtime = codex_state::StateRuntime::init( home.path().to_path_buf(), config.model_provider_id.clone(), - None, ) .await .expect("state db should initialize"); @@ -1527,7 +1520,6 @@ mod tests { let runtime = codex_state::StateRuntime::init( home.path().to_path_buf(), config.model_provider_id.clone(), - None, ) .await .expect("state db should initialize"); diff --git a/codex-rs/core/src/rollout/tests.rs b/codex-rs/core/src/rollout/tests.rs index 14266285f..12af36b78 100644 --- a/codex-rs/core/src/rollout/tests.rs +++ b/codex-rs/core/src/rollout/tests.rs @@ -57,10 +57,9 @@ async fn insert_state_db_thread( rollout_path: &Path, archived: bool, ) { - let runtime = - codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None) - .await - .expect("state db should initialize"); + let runtime = codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string()) + .await + .expect("state db should initialize"); runtime .mark_backfill_complete(None) .await @@ -248,10 +247,9 @@ async fn find_thread_path_repairs_missing_db_row_after_filesystem_fallback() { let fs_rollout_path = home.join(format!("sessions/2025/01/03/rollout-{ts}-{uuid}.jsonl")); // Create an empty state DB so lookup takes the DB-first path and then falls back to files. - let _runtime = - codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None) - .await - .expect("state db should initialize"); + let _runtime = codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string()) + .await + .expect("state db should initialize"); _runtime .mark_backfill_complete(None) .await @@ -279,10 +277,9 @@ async fn assert_state_db_rollout_path( thread_id: ThreadId, expected_path: Option<&Path>, ) { - let runtime = - codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string(), None) - .await - .expect("state db should initialize"); + let runtime = codex_state::StateRuntime::init(home.to_path_buf(), TEST_PROVIDER.to_string()) + .await + .expect("state db should initialize"); let path = runtime .find_rollout_path_by_id(thread_id, Some(false)) .await diff --git a/codex-rs/core/src/state_db.rs b/codex-rs/core/src/state_db.rs index 922331e3c..b53b748f3 100644 --- a/codex-rs/core/src/state_db.rs +++ b/codex-rs/core/src/state_db.rs @@ -7,7 +7,6 @@ use chrono::DateTime; use chrono::NaiveDateTime; use chrono::Timelike; use chrono::Utc; -use codex_otel::SessionTelemetry; use codex_protocol::ThreadId; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::protocol::RolloutItem; @@ -26,14 +25,10 @@ pub type StateDbHandle = Arc; /// Initialize the state runtime for thread state persistence and backfill checks. To only be used /// inside `core`. The initialization should not be done anywhere else. -pub(crate) async fn init( - config: &Config, - otel: Option<&SessionTelemetry>, -) -> Option { +pub(crate) async fn init(config: &Config) -> Option { let runtime = match codex_state::StateRuntime::init( config.sqlite_home.clone(), config.model_provider_id.clone(), - otel.cloned(), ) .await { @@ -43,9 +38,6 @@ pub(crate) async fn init( "failed to initialize state runtime at {}: {err}", config.sqlite_home.display() ); - if let Some(otel) = otel { - otel.counter("codex.db.init", 1, &[("status", "init_error")]); - } return None; } }; @@ -62,20 +54,15 @@ pub(crate) async fn init( if backfill_state.status != codex_state::BackfillStatus::Complete { let runtime_for_backfill = runtime.clone(); let config = config.clone(); - let otel = otel.cloned(); tokio::spawn(async move { - metadata::backfill_sessions(runtime_for_backfill.as_ref(), &config, otel.as_ref()) - .await; + metadata::backfill_sessions(runtime_for_backfill.as_ref(), &config).await; }); } Some(runtime) } /// Get the DB if the feature is enabled and the DB exists. -pub async fn get_state_db( - config: &Config, - otel: Option<&SessionTelemetry>, -) -> Option { +pub async fn get_state_db(config: &Config) -> Option { let state_path = codex_state::state_db_path(config.sqlite_home.as_path()); if !tokio::fs::try_exists(&state_path).await.unwrap_or(false) { return None; @@ -83,7 +70,6 @@ pub async fn get_state_db( let runtime = codex_state::StateRuntime::init( config.sqlite_home.clone(), config.model_provider_id.clone(), - otel.cloned(), ) .await .ok()?; @@ -98,13 +84,10 @@ pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Optio if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) { return None; } - let runtime = codex_state::StateRuntime::init( - codex_home.to_path_buf(), - default_provider.to_string(), - None, - ) - .await - .ok()?; + let runtime = + codex_state::StateRuntime::init(codex_home.to_path_buf(), default_provider.to_string()) + .await + .ok()?; require_backfill_complete(runtime, codex_home).await } @@ -373,7 +356,7 @@ pub async fn reconcile_rollout( return; } let outcome = - match metadata::extract_metadata_from_rollout(rollout_path, default_provider, None).await { + match metadata::extract_metadata_from_rollout(rollout_path, default_provider).await { Ok(outcome) => outcome, Err(err) => { warn!( @@ -529,13 +512,7 @@ pub async fn apply_rollout_items( builder.rollout_path = rollout_path.to_path_buf(); builder.cwd = normalize_cwd_for_state_db(&builder.cwd); if let Err(err) = ctx - .apply_rollout_items( - &builder, - items, - None, - new_thread_memory_mode, - updated_at_override, - ) + .apply_rollout_items(&builder, items, new_thread_memory_mode, updated_at_override) .await { warn!( diff --git a/codex-rs/core/src/stream_events_utils.rs b/codex-rs/core/src/stream_events_utils.rs index 106f88105..8f77a80e3 100644 --- a/codex-rs/core/src/stream_events_utils.rs +++ b/codex-rs/core/src/stream_events_utils.rs @@ -129,7 +129,7 @@ async fn record_stage1_output_usage_for_completed_item( return; } - if let Some(db) = state_db::get_state_db(turn_context.config.as_ref(), None).await { + if let Some(db) = state_db::get_state_db(turn_context.config.as_ref()).await { let _ = db.record_stage1_output_usage(&thread_ids).await; } } diff --git a/codex-rs/core/tests/suite/memories.rs b/codex-rs/core/tests/suite/memories.rs index 7fb4fe0bb..df7ffafb2 100644 --- a/codex-rs/core/tests/suite/memories.rs +++ b/codex-rs/core/tests/suite/memories.rs @@ -339,8 +339,7 @@ async fn build_test_codex(server: &wiremock::MockServer, home: Arc) -> async fn init_state_db(home: &Arc) -> Result> { let db = - codex_state::StateRuntime::init(home.path().to_path_buf(), "test-provider".into(), None) - .await?; + codex_state::StateRuntime::init(home.path().to_path_buf(), "test-provider".into()).await?; db.mark_backfill_complete(None).await?; Ok(db) } diff --git a/codex-rs/core/tests/suite/rollout_list_find.rs b/codex-rs/core/tests/suite/rollout_list_find.rs index 83350a0d0..a9ea2b9c8 100644 --- a/codex-rs/core/tests/suite/rollout_list_find.rs +++ b/codex-rs/core/tests/suite/rollout_list_find.rs @@ -57,7 +57,7 @@ fn write_minimal_rollout_with_id(codex_home: &Path, id: Uuid) -> PathBuf { } async fn upsert_thread_metadata(codex_home: &Path, thread_id: ThreadId, rollout_path: PathBuf) { - let runtime = StateRuntime::init(codex_home.to_path_buf(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.to_path_buf(), "test-provider".to_string()) .await .unwrap(); runtime.mark_backfill_complete(None).await.unwrap(); diff --git a/codex-rs/state/Cargo.toml b/codex-rs/state/Cargo.toml index 837d45138..d4106da88 100644 --- a/codex-rs/state/Cargo.toml +++ b/codex-rs/state/Cargo.toml @@ -8,7 +8,6 @@ license.workspace = true anyhow = { workspace = true } chrono = { workspace = true } clap = { workspace = true, features = ["derive", "env"] } -codex-otel = { workspace = true } codex-protocol = { workspace = true } dirs = { workspace = true } log = { workspace = true } diff --git a/codex-rs/state/src/bin/logs_client.rs b/codex-rs/state/src/bin/logs_client.rs index 8b7279c73..e1d4e1faa 100644 --- a/codex-rs/state/src/bin/logs_client.rs +++ b/codex-rs/state/src/bin/logs_client.rs @@ -88,7 +88,7 @@ async fn main() -> anyhow::Result<()> { .parent() .map(ToOwned::to_owned) .unwrap_or_else(|| PathBuf::from(".")); - let runtime = StateRuntime::init(codex_home, "logs-client".to_string(), None).await?; + let runtime = StateRuntime::init(codex_home, "logs-client".to_string()).await?; let mut last_id = print_backfill(runtime.as_ref(), &filter, args.backfill, args.compact).await?; diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index 84b4462ce..e90672295 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -58,7 +58,6 @@ pub const LOGS_DB_VERSION: u32 = 1; pub const STATE_DB_FILENAME: &str = "state"; pub const STATE_DB_VERSION: u32 = 5; -const METRIC_DB_INIT: &str = "codex.db.init"; /// Errors encountered during DB operations. Tags: [stage] pub const DB_ERROR_METRIC: &str = "codex.db.error"; /// Metrics on backfill process. Tags: [status] diff --git a/codex-rs/state/src/log_db.rs b/codex-rs/state/src/log_db.rs index c6f4a97ee..e533d3c89 100644 --- a/codex-rs/state/src/log_db.rs +++ b/codex-rs/state/src/log_db.rs @@ -392,7 +392,7 @@ mod tests { async fn sqlite_feedback_logs_match_feedback_formatter_shape() { let codex_home = std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4())); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); let writer = SharedWriter::default(); @@ -463,7 +463,7 @@ mod tests { async fn flush_persists_logs_for_query() { let codex_home = std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4())); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); let layer = start(runtime.clone()); diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 647565b73..7ea6a53b2 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -5,13 +5,11 @@ use crate::AgentJobItemCreateParams; use crate::AgentJobItemStatus; use crate::AgentJobProgress; use crate::AgentJobStatus; -use crate::DB_ERROR_METRIC; use crate::LOGS_DB_FILENAME; use crate::LOGS_DB_VERSION; use crate::LogEntry; use crate::LogQuery; use crate::LogRow; -use crate::METRIC_DB_INIT; use crate::STATE_DB_FILENAME; use crate::STATE_DB_VERSION; use crate::SortKey; @@ -28,7 +26,6 @@ use crate::model::datetime_to_epoch_seconds; use crate::paths::file_modified_time_utc; use chrono::DateTime; use chrono::Utc; -use codex_otel::SessionTelemetry; use codex_protocol::ThreadId; use codex_protocol::dynamic_tools::DynamicToolSpec; use codex_protocol::protocol::RolloutItem; @@ -81,11 +78,7 @@ impl StateRuntime { /// This opens (and migrates) the SQLite databases under `codex_home`, /// keeping logs in a dedicated file to reduce lock contention with the /// rest of the state store. - pub async fn init( - codex_home: PathBuf, - default_provider: String, - otel: Option, - ) -> anyhow::Result> { + pub async fn init(codex_home: PathBuf, default_provider: String) -> anyhow::Result> { tokio::fs::create_dir_all(&codex_home).await?; let current_state_name = state_db_filename(); let current_logs_name = logs_db_filename(); @@ -105,14 +98,10 @@ impl StateRuntime { .await; let state_path = state_db_path(codex_home.as_path()); let logs_path = logs_db_path(codex_home.as_path()); - let existed = tokio::fs::try_exists(&state_path).await.unwrap_or(false); let pool = match open_sqlite(&state_path, &STATE_MIGRATOR).await { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open state db at {}: {err}", state_path.display()); - if let Some(otel) = otel.as_ref() { - otel.counter(METRIC_DB_INIT, 1, &[("status", "open_error")]); - } return Err(err); } }; @@ -120,24 +109,15 @@ impl StateRuntime { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open logs db at {}: {err}", logs_path.display()); - if let Some(otel) = otel.as_ref() { - otel.counter(METRIC_DB_INIT, 1, &[("status", "open_error")]); - } return Err(err); } }; - if let Some(otel) = otel.as_ref() { - otel.counter(METRIC_DB_INIT, 1, &[("status", "opened")]); - } let runtime = Arc::new(Self { pool, logs_pool, codex_home, default_provider, }); - if !existed && let Some(otel) = otel.as_ref() { - otel.counter(METRIC_DB_INIT, 1, &[("status", "created")]); - } Ok(runtime) } diff --git a/codex-rs/state/src/runtime/backfill.rs b/codex-rs/state/src/runtime/backfill.rs index 93e156987..ff63eda5f 100644 --- a/codex-rs/state/src/runtime/backfill.rs +++ b/codex-rs/state/src/runtime/backfill.rs @@ -160,7 +160,7 @@ mod tests { .await .expect("write numeric"); - let _runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let _runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -207,7 +207,7 @@ mod tests { #[tokio::test] async fn backfill_state_persists_progress_and_completion() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -260,7 +260,7 @@ mod tests { #[tokio::test] async fn backfill_claim_is_singleton_until_stale_and_blocked_when_complete() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); diff --git a/codex-rs/state/src/runtime/logs.rs b/codex-rs/state/src/runtime/logs.rs index ca6d69e64..a2c2779a6 100644 --- a/codex-rs/state/src/runtime/logs.rs +++ b/codex-rs/state/src/runtime/logs.rs @@ -495,7 +495,7 @@ mod tests { #[tokio::test] async fn insert_logs_use_dedicated_log_database() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -527,7 +527,7 @@ mod tests { #[tokio::test] async fn query_logs_with_search_matches_substring() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -578,7 +578,7 @@ mod tests { #[tokio::test] async fn insert_logs_prunes_old_rows_when_thread_exceeds_size_limit() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -630,7 +630,7 @@ mod tests { #[tokio::test] async fn insert_logs_prunes_single_thread_row_when_it_exceeds_size_limit() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -667,7 +667,7 @@ mod tests { #[tokio::test] async fn insert_logs_prunes_threadless_rows_per_process_uuid_only() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -733,7 +733,7 @@ mod tests { #[tokio::test] async fn insert_logs_prunes_single_threadless_process_row_when_it_exceeds_size_limit() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -770,7 +770,7 @@ mod tests { #[tokio::test] async fn insert_logs_prunes_threadless_rows_with_null_process_uuid() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -835,7 +835,7 @@ mod tests { #[tokio::test] async fn insert_logs_prunes_single_threadless_null_process_row_when_it_exceeds_limit() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -872,7 +872,7 @@ mod tests { #[tokio::test] async fn insert_logs_prunes_old_rows_when_thread_exceeds_row_limit() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -914,7 +914,7 @@ mod tests { #[tokio::test] async fn insert_logs_prunes_old_threadless_rows_when_process_exceeds_row_limit() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -960,7 +960,7 @@ mod tests { #[tokio::test] async fn insert_logs_prunes_old_threadless_null_process_rows_when_row_limit_exceeded() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -1006,7 +1006,7 @@ mod tests { #[tokio::test] async fn query_feedback_logs_returns_newest_lines_within_limit_in_order() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -1068,7 +1068,7 @@ mod tests { #[tokio::test] async fn query_feedback_logs_excludes_oversized_newest_row() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); let eleven_mebibytes = "z".repeat(11 * 1024 * 1024); @@ -1116,7 +1116,7 @@ mod tests { #[tokio::test] async fn query_feedback_logs_includes_threadless_rows_from_same_process() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -1190,7 +1190,7 @@ mod tests { #[tokio::test] async fn query_feedback_logs_excludes_threadless_rows_from_prior_processes() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -1264,7 +1264,7 @@ mod tests { #[tokio::test] async fn query_feedback_logs_keeps_newest_suffix_across_thread_and_threadless_logs() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); let thread_marker = "thread-scoped-oldest"; diff --git a/codex-rs/state/src/runtime/memories.rs b/codex-rs/state/src/runtime/memories.rs index 67d912d7e..0072e34fe 100644 --- a/codex-rs/state/src/runtime/memories.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -1294,7 +1294,7 @@ mod tests { #[tokio::test] async fn stage1_claim_skips_when_up_to_date() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -1353,7 +1353,7 @@ mod tests { #[tokio::test] async fn stage1_running_stale_can_be_stolen_but_fresh_running_is_skipped() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -1399,7 +1399,7 @@ mod tests { #[tokio::test] async fn stage1_concurrent_claim_for_same_thread_is_conflict_safe() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -1464,7 +1464,7 @@ mod tests { #[tokio::test] async fn stage1_concurrent_claims_respect_running_cap() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -1526,7 +1526,7 @@ mod tests { #[tokio::test] async fn claim_stage1_jobs_filters_by_age_idle_and_current_thread() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -1616,7 +1616,7 @@ mod tests { #[tokio::test] async fn claim_stage1_jobs_prefilters_threads_with_up_to_date_memory() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -1715,7 +1715,7 @@ mod tests { #[tokio::test] async fn claim_stage1_jobs_skips_threads_with_disabled_memory_mode() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -1786,7 +1786,7 @@ mod tests { #[tokio::test] async fn reset_memory_data_for_fresh_start_clears_rows_and_disables_threads() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -1895,7 +1895,7 @@ mod tests { #[tokio::test] async fn claim_stage1_jobs_enforces_global_running_cap() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -2022,7 +2022,7 @@ WHERE kind = 'memory_stage1' #[tokio::test] async fn claim_stage1_jobs_processes_two_full_batches_across_startup_passes() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -2109,7 +2109,7 @@ WHERE kind = 'memory_stage1' #[tokio::test] async fn stage1_output_cascades_on_thread_delete() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -2176,7 +2176,7 @@ WHERE kind = 'memory_stage1' #[tokio::test] async fn mark_stage1_job_succeeded_no_output_skips_phase2_when_output_was_already_absent() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -2255,7 +2255,7 @@ WHERE kind = 'memory_stage1' #[tokio::test] async fn mark_stage1_job_succeeded_no_output_enqueues_phase2_when_deleting_output() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -2366,7 +2366,7 @@ WHERE kind = 'memory_stage1' #[tokio::test] async fn stage1_retry_exhaustion_does_not_block_newer_watermark() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -2448,7 +2448,7 @@ WHERE kind = 'memory_stage1' #[tokio::test] async fn phase2_global_consolidation_reruns_when_watermark_advances() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -2504,7 +2504,7 @@ WHERE kind = 'memory_stage1' #[tokio::test] async fn list_stage1_outputs_for_global_returns_latest_outputs() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -2595,7 +2595,7 @@ WHERE kind = 'memory_stage1' #[tokio::test] async fn list_stage1_outputs_for_global_skips_empty_payloads() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -2664,7 +2664,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn list_stage1_outputs_for_global_skips_polluted_threads() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -2729,7 +2729,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn get_phase2_input_selection_reports_added_retained_and_removed_rows() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -2841,7 +2841,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn get_phase2_input_selection_marks_polluted_previous_selection_as_removed() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -2947,7 +2947,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn mark_thread_memory_mode_polluted_enqueues_phase2_for_selected_threads() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -3032,7 +3032,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn get_phase2_input_selection_treats_regenerated_selected_rows_as_added() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -3148,7 +3148,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn get_phase2_input_selection_reports_regenerated_previous_selection_as_removed() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -3303,7 +3303,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn mark_global_phase2_job_succeeded_updates_selected_snapshot_timestamp() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -3445,7 +3445,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn mark_global_phase2_job_succeeded_only_marks_exact_selected_snapshots() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -3560,7 +3560,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn record_stage1_output_usage_updates_usage_metadata() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -3659,7 +3659,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn get_phase2_input_selection_prioritizes_usage_count_then_recent_usage() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -3750,7 +3750,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn get_phase2_input_selection_excludes_stale_used_memories_but_keeps_fresh_never_used() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -3841,7 +3841,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn get_phase2_input_selection_prefers_recent_thread_updates_over_recent_generation() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -3921,7 +3921,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn prune_stage1_outputs_for_retention_prunes_stale_unselected_rows_only() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -4057,7 +4057,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn prune_stage1_outputs_for_retention_respects_batch_limit() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -4129,7 +4129,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn mark_stage1_job_succeeded_enqueues_global_consolidation() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -4218,7 +4218,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn phase2_global_lock_allows_only_one_fresh_runner() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -4251,7 +4251,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn phase2_global_lock_stale_lease_allows_takeover() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -4318,7 +4318,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn phase2_backfilled_inputs_below_last_success_still_become_dirty() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); @@ -4377,7 +4377,7 @@ VALUES (?, ?, ?, ?, ?) #[tokio::test] async fn phase2_failure_fallback_updates_unowned_running_job() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("initialize runtime"); diff --git a/codex-rs/state/src/runtime/threads.rs b/codex-rs/state/src/runtime/threads.rs index 8825f13fe..344a89364 100644 --- a/codex-rs/state/src/runtime/threads.rs +++ b/codex-rs/state/src/runtime/threads.rs @@ -447,7 +447,6 @@ ON CONFLICT(thread_id, position) DO NOTHING &self, builder: &ThreadMetadataBuilder, items: &[RolloutItem], - otel: Option<&SessionTelemetry>, new_thread_memory_mode: Option<&str>, updated_at_override: Option>, ) -> anyhow::Result<()> { @@ -480,20 +479,12 @@ ON CONFLICT(thread_id, position) DO NOTHING } else { self.upsert_thread(&metadata).await }; - if let Err(err) = upsert_result { - if let Some(otel) = otel { - otel.counter(DB_ERROR_METRIC, 1, &[("stage", "apply_rollout_items")]); - } - return Err(err); - } + upsert_result?; if let Some(memory_mode) = extract_memory_mode(items) && let Err(err) = self .set_thread_memory_mode(builder.id, memory_mode.as_str()) .await { - if let Some(otel) = otel { - otel.counter(DB_ERROR_METRIC, 1, &[("stage", "set_thread_memory_mode")]); - } return Err(err); } let dynamic_tools = extract_dynamic_tools(items); @@ -502,9 +493,6 @@ ON CONFLICT(thread_id, position) DO NOTHING .persist_dynamic_tools(builder.id, dynamic_tools.as_deref()) .await { - if let Some(otel) = otel { - otel.counter(DB_ERROR_METRIC, 1, &[("stage", "persist_dynamic_tools")]); - } return Err(err); } Ok(()) @@ -678,7 +666,7 @@ mod tests { #[tokio::test] async fn upsert_thread_keeps_creation_memory_mode_for_existing_rows() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("state db should initialize"); let thread_id = @@ -716,7 +704,7 @@ mod tests { #[tokio::test] async fn apply_rollout_items_restores_memory_mode_from_session_meta() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("state db should initialize"); let thread_id = @@ -754,7 +742,7 @@ mod tests { })]; runtime - .apply_rollout_items(&builder, &items, None, None, None) + .apply_rollout_items(&builder, &items, None, None) .await .expect("apply_rollout_items should succeed"); @@ -768,7 +756,7 @@ mod tests { #[tokio::test] async fn apply_rollout_items_preserves_existing_git_branch_and_fills_missing_git_fields() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("state db should initialize"); let thread_id = @@ -812,7 +800,7 @@ mod tests { })]; runtime - .apply_rollout_items(&builder, &items, None, None, None) + .apply_rollout_items(&builder, &items, None, None) .await .expect("apply_rollout_items should succeed"); @@ -832,7 +820,7 @@ mod tests { #[tokio::test] async fn update_thread_git_info_preserves_newer_non_git_metadata() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("state db should initialize"); let thread_id = @@ -891,7 +879,7 @@ mod tests { #[tokio::test] async fn insert_thread_if_absent_preserves_existing_metadata() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("state db should initialize"); let thread_id = @@ -936,7 +924,7 @@ mod tests { #[tokio::test] async fn update_thread_git_info_can_clear_fields() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("state db should initialize"); let thread_id = @@ -970,7 +958,7 @@ mod tests { #[tokio::test] async fn touch_thread_updated_at_updates_only_updated_at() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("state db should initialize"); let thread_id = @@ -1007,7 +995,7 @@ mod tests { #[tokio::test] async fn apply_rollout_items_uses_override_updated_at_when_provided() { let codex_home = unique_temp_dir(); - let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await .expect("state db should initialize"); let thread_id = @@ -1045,7 +1033,7 @@ mod tests { DateTime::::from_timestamp(1_700_001_234, 0).expect("timestamp"); runtime - .apply_rollout_items(&builder, &items, None, None, Some(override_updated_at)) + .apply_rollout_items(&builder, &items, None, Some(override_updated_at)) .await .expect("apply_rollout_items should succeed"); diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 3108cb938..43e0e2afe 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -504,7 +504,7 @@ pub async fn run_main(mut cli: Cli, arg0_paths: Arg0DispatchPaths) -> std::io::R let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer()); - let log_db_layer = codex_core::state_db::get_state_db(&config, None) + let log_db_layer = codex_core::state_db::get_state_db(&config) .await .map(|db| log_db::start(db).with_filter(env_filter())); @@ -961,7 +961,7 @@ pub(crate) async fn read_session_cwd( thread_id: ThreadId, path: &Path, ) -> Option { - if let Some(state_db_ctx) = get_state_db(config, None).await + if let Some(state_db_ctx) = get_state_db(config).await && let Ok(Some(metadata)) = state_db_ctx.get_thread(thread_id).await { return Some(metadata.cwd); @@ -1519,7 +1519,6 @@ trust_level = "untrusted" let runtime = codex_state::StateRuntime::init( config.codex_home.clone(), config.model_provider_id.clone(), - None, ) .await .map_err(std::io::Error::other)?;