diff --git a/codex-rs/state/logs_migrations/0001_logs.sql b/codex-rs/state/logs_migrations/0001_logs.sql new file mode 100644 index 000000000..b5315dc82 --- /dev/null +++ b/codex-rs/state/logs_migrations/0001_logs.sql @@ -0,0 +1,21 @@ +CREATE TABLE logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts INTEGER NOT NULL, + ts_nanos INTEGER NOT NULL, + level TEXT NOT NULL, + target TEXT NOT NULL, + message TEXT, + module_path TEXT, + file TEXT, + line INTEGER, + thread_id TEXT, + process_uuid TEXT, + estimated_bytes INTEGER NOT NULL DEFAULT 0 +); + +CREATE INDEX idx_logs_ts ON logs(ts DESC, ts_nanos DESC, id DESC); +CREATE INDEX idx_logs_thread_id ON logs(thread_id); +CREATE INDEX idx_logs_process_uuid ON logs(process_uuid); +CREATE INDEX idx_logs_thread_id_ts ON logs(thread_id, ts DESC, ts_nanos DESC, id DESC); +CREATE INDEX idx_logs_process_uuid_threadless_ts ON logs(process_uuid, ts DESC, ts_nanos DESC, id DESC) +WHERE thread_id IS NULL; diff --git a/codex-rs/state/src/bin/logs_client.rs b/codex-rs/state/src/bin/logs_client.rs index 6b0039ee9..8b7279c73 100644 --- a/codex-rs/state/src/bin/logs_client.rs +++ b/codex-rs/state/src/bin/logs_client.rs @@ -12,13 +12,13 @@ use owo_colors::OwoColorize; #[derive(Debug, Parser)] #[command(name = "codex-state-logs")] -#[command(about = "Tail Codex logs from the state SQLite DB with simple filters")] +#[command(about = "Tail Codex logs from the dedicated logs SQLite DB with simple filters")] struct Args { /// Path to CODEX_HOME. Defaults to $CODEX_HOME or ~/.codex. #[arg(long, env = "CODEX_HOME")] codex_home: Option, - /// Direct path to the SQLite database. Overrides --codex-home. + /// Direct path to the logs SQLite database. Overrides --codex-home. #[arg(long)] db: Option, @@ -113,7 +113,7 @@ fn resolve_db_path(args: &Args) -> anyhow::Result { } let codex_home = args.codex_home.clone().unwrap_or_else(default_codex_home); - Ok(codex_state::state_db_path(codex_home.as_path())) + Ok(codex_state::logs_db_path(codex_home.as_path())) } fn default_codex_home() -> PathBuf { diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index a4c717226..fa0d74c59 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -44,12 +44,16 @@ pub use model::Stage1StartupClaimParams; pub use model::ThreadMetadata; pub use model::ThreadMetadataBuilder; pub use model::ThreadsPage; +pub use runtime::logs_db_filename; +pub use runtime::logs_db_path; pub use runtime::state_db_filename; pub use runtime::state_db_path; /// Environment variable for overriding the SQLite state database home directory. pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME"; +pub const LOGS_DB_FILENAME: &str = "logs"; +pub const LOGS_DB_VERSION: u32 = 1; pub const STATE_DB_FILENAME: &str = "state"; pub const STATE_DB_VERSION: u32 = 5; diff --git a/codex-rs/state/src/log_db.rs b/codex-rs/state/src/log_db.rs index 4fe9be184..413b07e50 100644 --- a/codex-rs/state/src/log_db.rs +++ b/codex-rs/state/src/log_db.rs @@ -1,7 +1,7 @@ //! Tracing log export into the state SQLite database. //! //! This module provides a `tracing_subscriber::Layer` that captures events and -//! inserts them into the `logs` table in `state.sqlite`. The writer runs in a +//! inserts them into the dedicated `logs` SQLite database. The writer runs in a //! background task and batches inserts to keep logging overhead low. //! //! ## Usage diff --git a/codex-rs/state/src/migrations.rs b/codex-rs/state/src/migrations.rs index 24b310224..6e7b9d363 100644 --- a/codex-rs/state/src/migrations.rs +++ b/codex-rs/state/src/migrations.rs @@ -1,3 +1,4 @@ use sqlx::migrate::Migrator; -pub(crate) static MIGRATOR: Migrator = sqlx::migrate!("./migrations"); +pub(crate) static STATE_MIGRATOR: Migrator = sqlx::migrate!("./migrations"); +pub(crate) static LOGS_MIGRATOR: Migrator = sqlx::migrate!("./logs_migrations"); diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 0be70a5b4..a32f041ce 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -6,6 +6,8 @@ use crate::AgentJobItemStatus; use crate::AgentJobProgress; use crate::AgentJobStatus; use crate::DB_ERROR_METRIC; +use crate::LOGS_DB_FILENAME; +use crate::LOGS_DB_VERSION; use crate::LogEntry; use crate::LogQuery; use crate::LogRow; @@ -17,7 +19,8 @@ use crate::ThreadMetadata; use crate::ThreadMetadataBuilder; use crate::ThreadsPage; use crate::apply_rollout_item; -use crate::migrations::MIGRATOR; +use crate::migrations::LOGS_MIGRATOR; +use crate::migrations::STATE_MIGRATOR; use crate::model::AgentJobRow; use crate::model::ThreadRow; use crate::model::anchor_from_item; @@ -37,6 +40,7 @@ use sqlx::Row; use sqlx::Sqlite; use sqlx::SqliteConnection; use sqlx::SqlitePool; +use sqlx::migrate::Migrator; use sqlx::sqlite::SqliteConnectOptions; use sqlx::sqlite::SqliteJournalMode; use sqlx::sqlite::SqlitePoolOptions; @@ -68,22 +72,41 @@ pub struct StateRuntime { codex_home: PathBuf, default_provider: String, pool: Arc, + logs_pool: Arc, } impl StateRuntime { /// Initialize the state runtime using the provided Codex home and default provider. /// - /// This opens (and migrates) the SQLite database at `codex_home/state.sqlite`. + /// This opens (and migrates) the SQLite databases under `codex_home`, + /// keeping logs in a dedicated file to reduce lock contention with the + /// rest of the state store. pub async fn init( codex_home: PathBuf, default_provider: String, otel: Option, ) -> anyhow::Result> { tokio::fs::create_dir_all(&codex_home).await?; - remove_legacy_state_files(&codex_home).await; + let current_state_name = state_db_filename(); + let current_logs_name = logs_db_filename(); + remove_legacy_db_files( + &codex_home, + current_state_name.as_str(), + STATE_DB_FILENAME, + "state", + ) + .await; + remove_legacy_db_files( + &codex_home, + current_logs_name.as_str(), + LOGS_DB_FILENAME, + "logs", + ) + .await; let state_path = state_db_path(codex_home.as_path()); + let logs_path = logs_db_path(codex_home.as_path()); let existed = tokio::fs::try_exists(&state_path).await.unwrap_or(false); - let pool = match open_sqlite(&state_path).await { + let pool = match open_sqlite(&state_path, &STATE_MIGRATOR).await { Ok(db) => Arc::new(db), Err(err) => { warn!("failed to open state db at {}: {err}", state_path.display()); @@ -93,11 +116,22 @@ impl StateRuntime { return Err(err); } }; + let logs_pool = match open_sqlite(&logs_path, &LOGS_MIGRATOR).await { + Ok(db) => Arc::new(db), + Err(err) => { + warn!("failed to open logs db at {}: {err}", logs_path.display()); + if let Some(otel) = otel.as_ref() { + otel.counter(METRIC_DB_INIT, 1, &[("status", "open_error")]); + } + return Err(err); + } + }; if let Some(otel) = otel.as_ref() { otel.counter(METRIC_DB_INIT, 1, &[("status", "opened")]); } let runtime = Arc::new(Self { pool, + logs_pool, codex_home, default_provider, }); @@ -113,7 +147,7 @@ impl StateRuntime { } } -async fn open_sqlite(path: &Path) -> anyhow::Result { +async fn open_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result { let options = SqliteConnectOptions::new() .filename(path) .create_if_missing(true) @@ -125,26 +159,42 @@ async fn open_sqlite(path: &Path) -> anyhow::Result { .max_connections(5) .connect_with(options) .await?; - MIGRATOR.run(&pool).await?; + migrator.run(&pool).await?; Ok(pool) } +fn db_filename(base_name: &str, version: u32) -> String { + format!("{base_name}_{version}.sqlite") +} + pub fn state_db_filename() -> String { - format!("{STATE_DB_FILENAME}_{STATE_DB_VERSION}.sqlite") + db_filename(STATE_DB_FILENAME, STATE_DB_VERSION) } pub fn state_db_path(codex_home: &Path) -> PathBuf { codex_home.join(state_db_filename()) } -async fn remove_legacy_state_files(codex_home: &Path) { - let current_name = state_db_filename(); +pub fn logs_db_filename() -> String { + db_filename(LOGS_DB_FILENAME, LOGS_DB_VERSION) +} + +pub fn logs_db_path(codex_home: &Path) -> PathBuf { + codex_home.join(logs_db_filename()) +} + +async fn remove_legacy_db_files( + codex_home: &Path, + current_name: &str, + base_name: &str, + db_label: &str, +) { let mut entries = match tokio::fs::read_dir(codex_home).await { Ok(entries) => entries, Err(err) => { warn!( - "failed to read codex_home for state db cleanup {}: {err}", - codex_home.display() + "failed to read codex_home for {db_label} db cleanup {}: {err}", + codex_home.display(), ); return; } @@ -160,37 +210,37 @@ async fn remove_legacy_state_files(codex_home: &Path) { } let file_name = entry.file_name(); let file_name = file_name.to_string_lossy(); - if !should_remove_state_file(file_name.as_ref(), current_name.as_str()) { + if !should_remove_db_file(file_name.as_ref(), current_name, base_name) { continue; } let legacy_path = entry.path(); if let Err(err) = tokio::fs::remove_file(&legacy_path).await { warn!( - "failed to remove legacy state db file {}: {err}", - legacy_path.display() + "failed to remove legacy {db_label} db file {}: {err}", + legacy_path.display(), ); } } } -fn should_remove_state_file(file_name: &str, current_name: &str) -> bool { - let mut base_name = file_name; +fn should_remove_db_file(file_name: &str, current_name: &str, base_name: &str) -> bool { + let mut normalized_name = file_name; for suffix in ["-wal", "-shm", "-journal"] { if let Some(stripped) = file_name.strip_suffix(suffix) { - base_name = stripped; + normalized_name = stripped; break; } } - if base_name == current_name { + if normalized_name == current_name { return false; } - let unversioned_name = format!("{STATE_DB_FILENAME}.sqlite"); - if base_name == unversioned_name { + let unversioned_name = format!("{base_name}.sqlite"); + if normalized_name == unversioned_name { return true; } - let Some(version_with_extension) = base_name.strip_prefix(&format!("{STATE_DB_FILENAME}_")) + let Some(version_with_extension) = normalized_name.strip_prefix(&format!("{base_name}_")) else { return false; }; diff --git a/codex-rs/state/src/runtime/logs.rs b/codex-rs/state/src/runtime/logs.rs index 88cbc22d5..ca6d69e64 100644 --- a/codex-rs/state/src/runtime/logs.rs +++ b/codex-rs/state/src/runtime/logs.rs @@ -11,7 +11,7 @@ impl StateRuntime { return Ok(()); } - let mut tx = self.pool.begin().await?; + let mut tx = self.logs_pool.begin().await?; let mut builder = QueryBuilder::::new( "INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line, estimated_bytes) ", ); @@ -281,7 +281,7 @@ WHERE id IN ( pub(crate) async fn delete_logs_before(&self, cutoff_ts: i64) -> anyhow::Result { let result = sqlx::query("DELETE FROM logs WHERE ts < ?") .bind(cutoff_ts) - .execute(self.pool.as_ref()) + .execute(self.logs_pool.as_ref()) .await?; Ok(result.rows_affected()) } @@ -303,7 +303,7 @@ WHERE id IN ( let rows = builder .build_query_as::() - .fetch_all(self.pool.as_ref()) + .fetch_all(self.logs_pool.as_ref()) .await?; Ok(rows) } @@ -377,7 +377,7 @@ ORDER BY ts ASC, ts_nanos ASC, id ASC .bind(thread_id) .bind(thread_id) .bind(max_bytes) - .fetch_all(self.pool.as_ref()) + .fetch_all(self.logs_pool.as_ref()) .await?; Ok(lines.concat().into_bytes()) @@ -388,7 +388,7 @@ ORDER BY ts ASC, ts_nanos ASC, id ASC let mut builder = QueryBuilder::::new("SELECT MAX(id) AS max_id FROM logs WHERE 1 = 1"); push_log_filters(&mut builder, query); - let row = builder.build().fetch_one(self.pool.as_ref()).await?; + let row = builder.build().fetch_one(self.logs_pool.as_ref()).await?; let max_id: Option = row.try_get("max_id")?; Ok(max_id.unwrap_or(0)) } @@ -465,7 +465,65 @@ mod tests { use super::test_support::unique_temp_dir; use crate::LogEntry; use crate::LogQuery; + use crate::logs_db_path; + use crate::state_db_path; use pretty_assertions::assert_eq; + use sqlx::SqlitePool; + use sqlx::sqlite::SqliteConnectOptions; + use std::path::Path; + + async fn open_db_pool(path: &Path) -> SqlitePool { + SqlitePool::connect_with( + SqliteConnectOptions::new() + .filename(path) + .create_if_missing(false), + ) + .await + .expect("open sqlite pool") + } + + async fn log_row_count(path: &Path) -> i64 { + let pool = open_db_pool(path).await; + let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM logs") + .fetch_one(&pool) + .await + .expect("count log rows"); + pool.close().await; + count + } + + #[tokio::test] + async fn insert_logs_use_dedicated_log_database() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + runtime + .insert_logs(&[LogEntry { + ts: 1, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some("dedicated-log-db".to_string()), + thread_id: Some("thread-1".to_string()), + process_uuid: Some("proc-1".to_string()), + module_path: Some("mod".to_string()), + file: Some("main.rs".to_string()), + line: Some(7), + }]) + .await + .expect("insert test logs"); + + let state_count = log_row_count(state_db_path(codex_home.as_path()).as_path()).await; + let logs_count = log_row_count(logs_db_path(codex_home.as_path()).as_path()).await; + + assert_eq!(state_count, 0); + assert_eq!(logs_count, 1); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + #[tokio::test] async fn query_logs_with_search_matches_substring() { let codex_home = unique_temp_dir();