Move sqlite logs to a dedicated database (#13772)
## Summary - move sqlite log reads and writes onto a dedicated `logs_1.sqlite` database to reduce lock contention with the main state DB - add a dedicated logs migrator and route `codex-state-logs` to the new database path - leave the old `logs` table in the existing state DB untouched for now ## Testing - just fmt - cargo test -p codex-state --------- Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
parent
51fcdc760d
commit
4e6c6193a1
7 changed files with 165 additions and 31 deletions
21
codex-rs/state/logs_migrations/0001_logs.sql
Normal file
21
codex-rs/state/logs_migrations/0001_logs.sql
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
CREATE TABLE logs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
ts INTEGER NOT NULL,
|
||||
ts_nanos INTEGER NOT NULL,
|
||||
level TEXT NOT NULL,
|
||||
target TEXT NOT NULL,
|
||||
message TEXT,
|
||||
module_path TEXT,
|
||||
file TEXT,
|
||||
line INTEGER,
|
||||
thread_id TEXT,
|
||||
process_uuid TEXT,
|
||||
estimated_bytes INTEGER NOT NULL DEFAULT 0
|
||||
);
|
||||
|
||||
CREATE INDEX idx_logs_ts ON logs(ts DESC, ts_nanos DESC, id DESC);
|
||||
CREATE INDEX idx_logs_thread_id ON logs(thread_id);
|
||||
CREATE INDEX idx_logs_process_uuid ON logs(process_uuid);
|
||||
CREATE INDEX idx_logs_thread_id_ts ON logs(thread_id, ts DESC, ts_nanos DESC, id DESC);
|
||||
CREATE INDEX idx_logs_process_uuid_threadless_ts ON logs(process_uuid, ts DESC, ts_nanos DESC, id DESC)
|
||||
WHERE thread_id IS NULL;
|
||||
|
|
@ -12,13 +12,13 @@ use owo_colors::OwoColorize;
|
|||
|
||||
#[derive(Debug, Parser)]
|
||||
#[command(name = "codex-state-logs")]
|
||||
#[command(about = "Tail Codex logs from the state SQLite DB with simple filters")]
|
||||
#[command(about = "Tail Codex logs from the dedicated logs SQLite DB with simple filters")]
|
||||
struct Args {
|
||||
/// Path to CODEX_HOME. Defaults to $CODEX_HOME or ~/.codex.
|
||||
#[arg(long, env = "CODEX_HOME")]
|
||||
codex_home: Option<PathBuf>,
|
||||
|
||||
/// Direct path to the SQLite database. Overrides --codex-home.
|
||||
/// Direct path to the logs SQLite database. Overrides --codex-home.
|
||||
#[arg(long)]
|
||||
db: Option<PathBuf>,
|
||||
|
||||
|
|
@ -113,7 +113,7 @@ fn resolve_db_path(args: &Args) -> anyhow::Result<PathBuf> {
|
|||
}
|
||||
|
||||
let codex_home = args.codex_home.clone().unwrap_or_else(default_codex_home);
|
||||
Ok(codex_state::state_db_path(codex_home.as_path()))
|
||||
Ok(codex_state::logs_db_path(codex_home.as_path()))
|
||||
}
|
||||
|
||||
fn default_codex_home() -> PathBuf {
|
||||
|
|
|
|||
|
|
@ -44,12 +44,16 @@ pub use model::Stage1StartupClaimParams;
|
|||
pub use model::ThreadMetadata;
|
||||
pub use model::ThreadMetadataBuilder;
|
||||
pub use model::ThreadsPage;
|
||||
pub use runtime::logs_db_filename;
|
||||
pub use runtime::logs_db_path;
|
||||
pub use runtime::state_db_filename;
|
||||
pub use runtime::state_db_path;
|
||||
|
||||
/// Environment variable for overriding the SQLite state database home directory.
|
||||
pub const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
|
||||
|
||||
pub const LOGS_DB_FILENAME: &str = "logs";
|
||||
pub const LOGS_DB_VERSION: u32 = 1;
|
||||
pub const STATE_DB_FILENAME: &str = "state";
|
||||
pub const STATE_DB_VERSION: u32 = 5;
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
//! Tracing log export into the state SQLite database.
|
||||
//!
|
||||
//! This module provides a `tracing_subscriber::Layer` that captures events and
|
||||
//! inserts them into the `logs` table in `state.sqlite`. The writer runs in a
|
||||
//! inserts them into the dedicated `logs` SQLite database. The writer runs in a
|
||||
//! background task and batches inserts to keep logging overhead low.
|
||||
//!
|
||||
//! ## Usage
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
use sqlx::migrate::Migrator;
|
||||
|
||||
pub(crate) static MIGRATOR: Migrator = sqlx::migrate!("./migrations");
|
||||
pub(crate) static STATE_MIGRATOR: Migrator = sqlx::migrate!("./migrations");
|
||||
pub(crate) static LOGS_MIGRATOR: Migrator = sqlx::migrate!("./logs_migrations");
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@ use crate::AgentJobItemStatus;
|
|||
use crate::AgentJobProgress;
|
||||
use crate::AgentJobStatus;
|
||||
use crate::DB_ERROR_METRIC;
|
||||
use crate::LOGS_DB_FILENAME;
|
||||
use crate::LOGS_DB_VERSION;
|
||||
use crate::LogEntry;
|
||||
use crate::LogQuery;
|
||||
use crate::LogRow;
|
||||
|
|
@ -17,7 +19,8 @@ use crate::ThreadMetadata;
|
|||
use crate::ThreadMetadataBuilder;
|
||||
use crate::ThreadsPage;
|
||||
use crate::apply_rollout_item;
|
||||
use crate::migrations::MIGRATOR;
|
||||
use crate::migrations::LOGS_MIGRATOR;
|
||||
use crate::migrations::STATE_MIGRATOR;
|
||||
use crate::model::AgentJobRow;
|
||||
use crate::model::ThreadRow;
|
||||
use crate::model::anchor_from_item;
|
||||
|
|
@ -37,6 +40,7 @@ use sqlx::Row;
|
|||
use sqlx::Sqlite;
|
||||
use sqlx::SqliteConnection;
|
||||
use sqlx::SqlitePool;
|
||||
use sqlx::migrate::Migrator;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use sqlx::sqlite::SqliteJournalMode;
|
||||
use sqlx::sqlite::SqlitePoolOptions;
|
||||
|
|
@ -68,22 +72,41 @@ pub struct StateRuntime {
|
|||
codex_home: PathBuf,
|
||||
default_provider: String,
|
||||
pool: Arc<sqlx::SqlitePool>,
|
||||
logs_pool: Arc<sqlx::SqlitePool>,
|
||||
}
|
||||
|
||||
impl StateRuntime {
|
||||
/// Initialize the state runtime using the provided Codex home and default provider.
|
||||
///
|
||||
/// This opens (and migrates) the SQLite database at `codex_home/state.sqlite`.
|
||||
/// This opens (and migrates) the SQLite databases under `codex_home`,
|
||||
/// keeping logs in a dedicated file to reduce lock contention with the
|
||||
/// rest of the state store.
|
||||
pub async fn init(
|
||||
codex_home: PathBuf,
|
||||
default_provider: String,
|
||||
otel: Option<OtelManager>,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
tokio::fs::create_dir_all(&codex_home).await?;
|
||||
remove_legacy_state_files(&codex_home).await;
|
||||
let current_state_name = state_db_filename();
|
||||
let current_logs_name = logs_db_filename();
|
||||
remove_legacy_db_files(
|
||||
&codex_home,
|
||||
current_state_name.as_str(),
|
||||
STATE_DB_FILENAME,
|
||||
"state",
|
||||
)
|
||||
.await;
|
||||
remove_legacy_db_files(
|
||||
&codex_home,
|
||||
current_logs_name.as_str(),
|
||||
LOGS_DB_FILENAME,
|
||||
"logs",
|
||||
)
|
||||
.await;
|
||||
let state_path = state_db_path(codex_home.as_path());
|
||||
let logs_path = logs_db_path(codex_home.as_path());
|
||||
let existed = tokio::fs::try_exists(&state_path).await.unwrap_or(false);
|
||||
let pool = match open_sqlite(&state_path).await {
|
||||
let pool = match open_sqlite(&state_path, &STATE_MIGRATOR).await {
|
||||
Ok(db) => Arc::new(db),
|
||||
Err(err) => {
|
||||
warn!("failed to open state db at {}: {err}", state_path.display());
|
||||
|
|
@ -93,11 +116,22 @@ impl StateRuntime {
|
|||
return Err(err);
|
||||
}
|
||||
};
|
||||
let logs_pool = match open_sqlite(&logs_path, &LOGS_MIGRATOR).await {
|
||||
Ok(db) => Arc::new(db),
|
||||
Err(err) => {
|
||||
warn!("failed to open logs db at {}: {err}", logs_path.display());
|
||||
if let Some(otel) = otel.as_ref() {
|
||||
otel.counter(METRIC_DB_INIT, 1, &[("status", "open_error")]);
|
||||
}
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
if let Some(otel) = otel.as_ref() {
|
||||
otel.counter(METRIC_DB_INIT, 1, &[("status", "opened")]);
|
||||
}
|
||||
let runtime = Arc::new(Self {
|
||||
pool,
|
||||
logs_pool,
|
||||
codex_home,
|
||||
default_provider,
|
||||
});
|
||||
|
|
@ -113,7 +147,7 @@ impl StateRuntime {
|
|||
}
|
||||
}
|
||||
|
||||
async fn open_sqlite(path: &Path) -> anyhow::Result<SqlitePool> {
|
||||
async fn open_sqlite(path: &Path, migrator: &'static Migrator) -> anyhow::Result<SqlitePool> {
|
||||
let options = SqliteConnectOptions::new()
|
||||
.filename(path)
|
||||
.create_if_missing(true)
|
||||
|
|
@ -125,26 +159,42 @@ async fn open_sqlite(path: &Path) -> anyhow::Result<SqlitePool> {
|
|||
.max_connections(5)
|
||||
.connect_with(options)
|
||||
.await?;
|
||||
MIGRATOR.run(&pool).await?;
|
||||
migrator.run(&pool).await?;
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
fn db_filename(base_name: &str, version: u32) -> String {
|
||||
format!("{base_name}_{version}.sqlite")
|
||||
}
|
||||
|
||||
pub fn state_db_filename() -> String {
|
||||
format!("{STATE_DB_FILENAME}_{STATE_DB_VERSION}.sqlite")
|
||||
db_filename(STATE_DB_FILENAME, STATE_DB_VERSION)
|
||||
}
|
||||
|
||||
pub fn state_db_path(codex_home: &Path) -> PathBuf {
|
||||
codex_home.join(state_db_filename())
|
||||
}
|
||||
|
||||
async fn remove_legacy_state_files(codex_home: &Path) {
|
||||
let current_name = state_db_filename();
|
||||
pub fn logs_db_filename() -> String {
|
||||
db_filename(LOGS_DB_FILENAME, LOGS_DB_VERSION)
|
||||
}
|
||||
|
||||
pub fn logs_db_path(codex_home: &Path) -> PathBuf {
|
||||
codex_home.join(logs_db_filename())
|
||||
}
|
||||
|
||||
async fn remove_legacy_db_files(
|
||||
codex_home: &Path,
|
||||
current_name: &str,
|
||||
base_name: &str,
|
||||
db_label: &str,
|
||||
) {
|
||||
let mut entries = match tokio::fs::read_dir(codex_home).await {
|
||||
Ok(entries) => entries,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to read codex_home for state db cleanup {}: {err}",
|
||||
codex_home.display()
|
||||
"failed to read codex_home for {db_label} db cleanup {}: {err}",
|
||||
codex_home.display(),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
|
@ -160,37 +210,37 @@ async fn remove_legacy_state_files(codex_home: &Path) {
|
|||
}
|
||||
let file_name = entry.file_name();
|
||||
let file_name = file_name.to_string_lossy();
|
||||
if !should_remove_state_file(file_name.as_ref(), current_name.as_str()) {
|
||||
if !should_remove_db_file(file_name.as_ref(), current_name, base_name) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let legacy_path = entry.path();
|
||||
if let Err(err) = tokio::fs::remove_file(&legacy_path).await {
|
||||
warn!(
|
||||
"failed to remove legacy state db file {}: {err}",
|
||||
legacy_path.display()
|
||||
"failed to remove legacy {db_label} db file {}: {err}",
|
||||
legacy_path.display(),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn should_remove_state_file(file_name: &str, current_name: &str) -> bool {
|
||||
let mut base_name = file_name;
|
||||
fn should_remove_db_file(file_name: &str, current_name: &str, base_name: &str) -> bool {
|
||||
let mut normalized_name = file_name;
|
||||
for suffix in ["-wal", "-shm", "-journal"] {
|
||||
if let Some(stripped) = file_name.strip_suffix(suffix) {
|
||||
base_name = stripped;
|
||||
normalized_name = stripped;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if base_name == current_name {
|
||||
if normalized_name == current_name {
|
||||
return false;
|
||||
}
|
||||
let unversioned_name = format!("{STATE_DB_FILENAME}.sqlite");
|
||||
if base_name == unversioned_name {
|
||||
let unversioned_name = format!("{base_name}.sqlite");
|
||||
if normalized_name == unversioned_name {
|
||||
return true;
|
||||
}
|
||||
|
||||
let Some(version_with_extension) = base_name.strip_prefix(&format!("{STATE_DB_FILENAME}_"))
|
||||
let Some(version_with_extension) = normalized_name.strip_prefix(&format!("{base_name}_"))
|
||||
else {
|
||||
return false;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ impl StateRuntime {
|
|||
return Ok(());
|
||||
}
|
||||
|
||||
let mut tx = self.pool.begin().await?;
|
||||
let mut tx = self.logs_pool.begin().await?;
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
"INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line, estimated_bytes) ",
|
||||
);
|
||||
|
|
@ -281,7 +281,7 @@ WHERE id IN (
|
|||
pub(crate) async fn delete_logs_before(&self, cutoff_ts: i64) -> anyhow::Result<u64> {
|
||||
let result = sqlx::query("DELETE FROM logs WHERE ts < ?")
|
||||
.bind(cutoff_ts)
|
||||
.execute(self.pool.as_ref())
|
||||
.execute(self.logs_pool.as_ref())
|
||||
.await?;
|
||||
Ok(result.rows_affected())
|
||||
}
|
||||
|
|
@ -303,7 +303,7 @@ WHERE id IN (
|
|||
|
||||
let rows = builder
|
||||
.build_query_as::<LogRow>()
|
||||
.fetch_all(self.pool.as_ref())
|
||||
.fetch_all(self.logs_pool.as_ref())
|
||||
.await?;
|
||||
Ok(rows)
|
||||
}
|
||||
|
|
@ -377,7 +377,7 @@ ORDER BY ts ASC, ts_nanos ASC, id ASC
|
|||
.bind(thread_id)
|
||||
.bind(thread_id)
|
||||
.bind(max_bytes)
|
||||
.fetch_all(self.pool.as_ref())
|
||||
.fetch_all(self.logs_pool.as_ref())
|
||||
.await?;
|
||||
|
||||
Ok(lines.concat().into_bytes())
|
||||
|
|
@ -388,7 +388,7 @@ ORDER BY ts ASC, ts_nanos ASC, id ASC
|
|||
let mut builder =
|
||||
QueryBuilder::<Sqlite>::new("SELECT MAX(id) AS max_id FROM logs WHERE 1 = 1");
|
||||
push_log_filters(&mut builder, query);
|
||||
let row = builder.build().fetch_one(self.pool.as_ref()).await?;
|
||||
let row = builder.build().fetch_one(self.logs_pool.as_ref()).await?;
|
||||
let max_id: Option<i64> = row.try_get("max_id")?;
|
||||
Ok(max_id.unwrap_or(0))
|
||||
}
|
||||
|
|
@ -465,7 +465,65 @@ mod tests {
|
|||
use super::test_support::unique_temp_dir;
|
||||
use crate::LogEntry;
|
||||
use crate::LogQuery;
|
||||
use crate::logs_db_path;
|
||||
use crate::state_db_path;
|
||||
use pretty_assertions::assert_eq;
|
||||
use sqlx::SqlitePool;
|
||||
use sqlx::sqlite::SqliteConnectOptions;
|
||||
use std::path::Path;
|
||||
|
||||
async fn open_db_pool(path: &Path) -> SqlitePool {
|
||||
SqlitePool::connect_with(
|
||||
SqliteConnectOptions::new()
|
||||
.filename(path)
|
||||
.create_if_missing(false),
|
||||
)
|
||||
.await
|
||||
.expect("open sqlite pool")
|
||||
}
|
||||
|
||||
async fn log_row_count(path: &Path) -> i64 {
|
||||
let pool = open_db_pool(path).await;
|
||||
let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM logs")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.expect("count log rows");
|
||||
pool.close().await;
|
||||
count
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_use_dedicated_log_database() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
runtime
|
||||
.insert_logs(&[LogEntry {
|
||||
ts: 1,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some("dedicated-log-db".to_string()),
|
||||
thread_id: Some("thread-1".to_string()),
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
module_path: Some("mod".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(7),
|
||||
}])
|
||||
.await
|
||||
.expect("insert test logs");
|
||||
|
||||
let state_count = log_row_count(state_db_path(codex_home.as_path()).as_path()).await;
|
||||
let logs_count = log_row_count(logs_db_path(codex_home.as_path()).as_path()).await;
|
||||
|
||||
assert_eq!(state_count, 0);
|
||||
assert_eq!(logs_count, 1);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn query_logs_with_search_matches_substring() {
|
||||
let codex_home = unique_temp_dir();
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue