Migrate state DB path helpers to versioned filename (#10623)
Summary - add versioned state sqlite filename helpers and re-export them from the state crate - remove legacy state files when initializing the runtime and update consumers/tests to use the new helpers - tweak logs client description and database resolution to match the new path
This commit is contained in:
parent
df000da917
commit
583e5d4f41
5 changed files with 180 additions and 13 deletions
|
|
@ -14,7 +14,6 @@ use codex_protocol::protocol::RolloutItem;
|
|||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_state::DB_METRIC_COMPARE_ERROR;
|
||||
pub use codex_state::LogEntry;
|
||||
use codex_state::STATE_DB_FILENAME;
|
||||
use codex_state::ThreadMetadataBuilder;
|
||||
use serde_json::Value;
|
||||
use std::path::Path;
|
||||
|
|
@ -32,7 +31,7 @@ pub(crate) async fn init_if_enabled(
|
|||
config: &Config,
|
||||
otel: Option<&OtelManager>,
|
||||
) -> Option<StateDbHandle> {
|
||||
let state_path = config.codex_home.join(STATE_DB_FILENAME);
|
||||
let state_path = codex_state::state_db_path(config.codex_home.as_path());
|
||||
if !config.features.enabled(Feature::Sqlite) {
|
||||
return None;
|
||||
}
|
||||
|
|
@ -74,7 +73,7 @@ pub(crate) async fn init_if_enabled(
|
|||
|
||||
/// Get the DB if the feature is enabled and the DB exists.
|
||||
pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option<StateDbHandle> {
|
||||
let state_path = config.codex_home.join(STATE_DB_FILENAME);
|
||||
let state_path = codex_state::state_db_path(config.codex_home.as_path());
|
||||
if !config.features.enabled(Feature::Sqlite)
|
||||
|| !tokio::fs::try_exists(&state_path).await.unwrap_or(false)
|
||||
{
|
||||
|
|
@ -93,7 +92,7 @@ pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option
|
|||
///
|
||||
/// This is used for parity checks during the SQLite migration phase.
|
||||
pub async fn open_if_present(codex_home: &Path, default_provider: &str) -> Option<StateDbHandle> {
|
||||
let db_path = codex_home.join(STATE_DB_FILENAME);
|
||||
let db_path = codex_state::state_db_path(codex_home);
|
||||
if !tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
|
||||
return None;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ use codex_protocol::protocol::SessionMeta;
|
|||
use codex_protocol::protocol::SessionMetaLine;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_protocol::protocol::UserMessageEvent;
|
||||
use codex_state::STATE_DB_FILENAME;
|
||||
use core_test_support::load_sse_fixture_with_id;
|
||||
use core_test_support::responses;
|
||||
use core_test_support::responses::ev_completed;
|
||||
|
|
@ -39,7 +38,7 @@ async fn new_thread_is_recorded_in_state_db() -> Result<()> {
|
|||
|
||||
let thread_id = test.session_configured.session_id;
|
||||
let rollout_path = test.codex.rollout_path().expect("rollout path");
|
||||
let db_path = test.config.codex_home.join(STATE_DB_FILENAME);
|
||||
let db_path = codex_state::state_db_path(test.config.codex_home.as_path());
|
||||
|
||||
for _ in 0..100 {
|
||||
if tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
|
||||
|
|
@ -149,7 +148,7 @@ async fn backfill_scans_existing_rollouts() -> Result<()> {
|
|||
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let db_path = test.config.codex_home.join(STATE_DB_FILENAME);
|
||||
let db_path = codex_state::state_db_path(test.config.codex_home.as_path());
|
||||
let rollout_path = test.config.codex_home.join(&rollout_rel_path);
|
||||
let default_provider = test.config.model_provider_id.clone();
|
||||
|
||||
|
|
@ -205,7 +204,7 @@ async fn user_messages_persist_in_state_db() -> Result<()> {
|
|||
});
|
||||
let test = builder.build(&server).await?;
|
||||
|
||||
let db_path = test.config.codex_home.join(STATE_DB_FILENAME);
|
||||
let db_path = codex_state::state_db_path(test.config.codex_home.as_path());
|
||||
for _ in 0..100 {
|
||||
if tokio::fs::try_exists(&db_path).await.unwrap_or(false) {
|
||||
break;
|
||||
|
|
|
|||
|
|
@ -6,14 +6,13 @@ use chrono::DateTime;
|
|||
use clap::Parser;
|
||||
use codex_state::LogQuery;
|
||||
use codex_state::LogRow;
|
||||
use codex_state::STATE_DB_FILENAME;
|
||||
use codex_state::StateRuntime;
|
||||
use dirs::home_dir;
|
||||
use owo_colors::OwoColorize;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
#[command(name = "codex-state-logs")]
|
||||
#[command(about = "Tail Codex logs from state.sqlite with simple filters")]
|
||||
#[command(about = "Tail Codex logs from the state SQLite DB with simple filters")]
|
||||
struct Args {
|
||||
/// Path to CODEX_HOME. Defaults to $CODEX_HOME or ~/.codex.
|
||||
#[arg(long, env = "CODEX_HOME")]
|
||||
|
|
@ -104,7 +103,7 @@ fn resolve_db_path(args: &Args) -> anyhow::Result<PathBuf> {
|
|||
}
|
||||
|
||||
let codex_home = args.codex_home.clone().unwrap_or_else(default_codex_home);
|
||||
Ok(codex_home.join(STATE_DB_FILENAME))
|
||||
Ok(codex_state::state_db_path(codex_home.as_path()))
|
||||
}
|
||||
|
||||
fn default_codex_home() -> PathBuf {
|
||||
|
|
|
|||
|
|
@ -29,6 +29,9 @@ pub use model::ThreadMetadata;
|
|||
pub use model::ThreadMetadataBuilder;
|
||||
pub use model::ThreadsPage;
|
||||
pub use runtime::STATE_DB_FILENAME;
|
||||
pub use runtime::STATE_DB_VERSION;
|
||||
pub use runtime::state_db_filename;
|
||||
pub use runtime::state_db_path;
|
||||
|
||||
/// Errors encountered during DB operations. Tags: [stage]
|
||||
pub const DB_ERROR_METRIC: &str = "codex.db.error";
|
||||
|
|
|
|||
|
|
@ -35,7 +35,8 @@ use std::sync::Arc;
|
|||
use std::time::Duration;
|
||||
use tracing::warn;
|
||||
|
||||
pub const STATE_DB_FILENAME: &str = "state.sqlite";
|
||||
pub const STATE_DB_FILENAME: &str = "state";
|
||||
pub const STATE_DB_VERSION: u32 = 2;
|
||||
|
||||
const METRIC_DB_INIT: &str = "codex.db.init";
|
||||
|
||||
|
|
@ -56,7 +57,8 @@ impl StateRuntime {
|
|||
otel: Option<OtelManager>,
|
||||
) -> anyhow::Result<Arc<Self>> {
|
||||
tokio::fs::create_dir_all(&codex_home).await?;
|
||||
let state_path = codex_home.join(STATE_DB_FILENAME);
|
||||
remove_legacy_state_files(&codex_home).await;
|
||||
let state_path = state_db_path(codex_home.as_path());
|
||||
let existed = tokio::fs::try_exists(&state_path).await.unwrap_or(false);
|
||||
let pool = match open_sqlite(&state_path).await {
|
||||
Ok(db) => Arc::new(db),
|
||||
|
|
@ -624,6 +626,77 @@ async fn open_sqlite(path: &Path) -> anyhow::Result<SqlitePool> {
|
|||
Ok(pool)
|
||||
}
|
||||
|
||||
pub fn state_db_filename() -> String {
|
||||
format!("{STATE_DB_FILENAME}_{STATE_DB_VERSION}.sqlite")
|
||||
}
|
||||
|
||||
pub fn state_db_path(codex_home: &Path) -> PathBuf {
|
||||
codex_home.join(state_db_filename())
|
||||
}
|
||||
|
||||
async fn remove_legacy_state_files(codex_home: &Path) {
|
||||
let current_name = state_db_filename();
|
||||
let mut entries = match tokio::fs::read_dir(codex_home).await {
|
||||
Ok(entries) => entries,
|
||||
Err(err) => {
|
||||
warn!(
|
||||
"failed to read codex_home for state db cleanup {}: {err}",
|
||||
codex_home.display()
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
while let Ok(Some(entry)) = entries.next_entry().await {
|
||||
if !entry
|
||||
.file_type()
|
||||
.await
|
||||
.map(|file_type| file_type.is_file())
|
||||
.unwrap_or(false)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
let file_name = entry.file_name();
|
||||
let file_name = file_name.to_string_lossy();
|
||||
if !should_remove_state_file(file_name.as_ref(), current_name.as_str()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let legacy_path = entry.path();
|
||||
if let Err(err) = tokio::fs::remove_file(&legacy_path).await {
|
||||
warn!(
|
||||
"failed to remove legacy state db file {}: {err}",
|
||||
legacy_path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn should_remove_state_file(file_name: &str, current_name: &str) -> bool {
|
||||
let mut base_name = file_name;
|
||||
for suffix in ["-wal", "-shm", "-journal"] {
|
||||
if let Some(stripped) = file_name.strip_suffix(suffix) {
|
||||
base_name = stripped;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if base_name == current_name {
|
||||
return false;
|
||||
}
|
||||
let unversioned_name = format!("{STATE_DB_FILENAME}.sqlite");
|
||||
if base_name == unversioned_name {
|
||||
return true;
|
||||
}
|
||||
|
||||
let Some(version_with_extension) = base_name.strip_prefix(&format!("{STATE_DB_FILENAME}_"))
|
||||
else {
|
||||
return false;
|
||||
};
|
||||
let Some(version_suffix) = version_with_extension.strip_suffix(".sqlite") else {
|
||||
return false;
|
||||
};
|
||||
!version_suffix.is_empty() && version_suffix.chars().all(|ch| ch.is_ascii_digit())
|
||||
}
|
||||
|
||||
fn push_thread_filters<'a>(
|
||||
builder: &mut QueryBuilder<'a, Sqlite>,
|
||||
archived_only: bool,
|
||||
|
|
@ -692,3 +765,97 @@ fn push_thread_order_and_limit(
|
|||
builder.push(" LIMIT ");
|
||||
builder.push_bind(limit as i64);
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::STATE_DB_FILENAME;
|
||||
use super::STATE_DB_VERSION;
|
||||
use super::StateRuntime;
|
||||
use super::state_db_filename;
|
||||
use pretty_assertions::assert_eq;
|
||||
use std::path::PathBuf;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
||||
fn unique_temp_dir() -> PathBuf {
|
||||
let nanos = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.map_or(0, |duration| duration.as_nanos());
|
||||
std::env::temp_dir().join(format!("codex-state-runtime-test-{nanos}"))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn init_removes_legacy_state_db_files() {
|
||||
let codex_home = unique_temp_dir();
|
||||
tokio::fs::create_dir_all(&codex_home)
|
||||
.await
|
||||
.expect("create codex_home");
|
||||
|
||||
let current_name = state_db_filename();
|
||||
let previous_version = STATE_DB_VERSION.saturating_sub(1);
|
||||
let unversioned_name = format!("{STATE_DB_FILENAME}.sqlite");
|
||||
for suffix in ["", "-wal", "-shm", "-journal"] {
|
||||
let path = codex_home.join(format!("{unversioned_name}{suffix}"));
|
||||
tokio::fs::write(path, b"legacy")
|
||||
.await
|
||||
.expect("write legacy");
|
||||
let old_version_path = codex_home.join(format!(
|
||||
"{STATE_DB_FILENAME}_{previous_version}.sqlite{suffix}"
|
||||
));
|
||||
tokio::fs::write(old_version_path, b"old_version")
|
||||
.await
|
||||
.expect("write old version");
|
||||
}
|
||||
let unrelated_path = codex_home.join("state.sqlite_backup");
|
||||
tokio::fs::write(&unrelated_path, b"keep")
|
||||
.await
|
||||
.expect("write unrelated");
|
||||
let numeric_path = codex_home.join("123");
|
||||
tokio::fs::write(&numeric_path, b"keep")
|
||||
.await
|
||||
.expect("write numeric");
|
||||
|
||||
let _runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
for suffix in ["", "-wal", "-shm", "-journal"] {
|
||||
let legacy_path = codex_home.join(format!("{unversioned_name}{suffix}"));
|
||||
assert_eq!(
|
||||
tokio::fs::try_exists(&legacy_path)
|
||||
.await
|
||||
.expect("check legacy path"),
|
||||
false
|
||||
);
|
||||
let old_version_path = codex_home.join(format!(
|
||||
"{STATE_DB_FILENAME}_{previous_version}.sqlite{suffix}"
|
||||
));
|
||||
assert_eq!(
|
||||
tokio::fs::try_exists(&old_version_path)
|
||||
.await
|
||||
.expect("check old version path"),
|
||||
false
|
||||
);
|
||||
}
|
||||
assert_eq!(
|
||||
tokio::fs::try_exists(codex_home.join(current_name))
|
||||
.await
|
||||
.expect("check new db path"),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tokio::fs::try_exists(&unrelated_path)
|
||||
.await
|
||||
.expect("check unrelated path"),
|
||||
true
|
||||
);
|
||||
assert_eq!(
|
||||
tokio::fs::try_exists(&numeric_path)
|
||||
.await
|
||||
.expect("check numeric path"),
|
||||
true
|
||||
);
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue