feat: async backfill (#10089)
This commit is contained in:
parent
780482da84
commit
714dc8d8bd
4 changed files with 58 additions and 27 deletions
|
|
@ -168,7 +168,7 @@ use codex_core::read_head_for_summary;
|
|||
use codex_core::read_session_meta_line;
|
||||
use codex_core::rollout_date_parts;
|
||||
use codex_core::sandboxing::SandboxPermissions;
|
||||
use codex_core::state_db::{self};
|
||||
use codex_core::state_db::get_state_db;
|
||||
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
|
||||
use codex_feedback::CodexFeedback;
|
||||
use codex_login::ServerOptions as LoginServerOptions;
|
||||
|
|
@ -1702,7 +1702,7 @@ impl CodexMessageProcessor {
|
|||
|
||||
let rollout_path_display = archived_path.display().to_string();
|
||||
let fallback_provider = self.config.model_provider_id.clone();
|
||||
let state_db_ctx = state_db::init_if_enabled(&self.config, None).await;
|
||||
let state_db_ctx = get_state_db(&self.config, None).await;
|
||||
let archived_folder = self
|
||||
.config
|
||||
.codex_home
|
||||
|
|
@ -3571,7 +3571,7 @@ impl CodexMessageProcessor {
|
|||
}
|
||||
|
||||
if state_db_ctx.is_none() {
|
||||
state_db_ctx = state_db::init_if_enabled(&self.config, None).await;
|
||||
state_db_ctx = get_state_db(&self.config, None).await;
|
||||
}
|
||||
|
||||
// Move the rollout file to archived.
|
||||
|
|
|
|||
|
|
@ -15,12 +15,14 @@ use codex_protocol::protocol::SessionMetaLine;
|
|||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_state::BackfillStats;
|
||||
use codex_state::DB_ERROR_METRIC;
|
||||
use codex_state::DB_METRIC_BACKFILL;
|
||||
use codex_state::ExtractionOutcome;
|
||||
use codex_state::ThreadMetadataBuilder;
|
||||
use codex_state::apply_rollout_item;
|
||||
use std::cmp::Reverse;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
|
||||
const ROLLOUT_PREFIX: &str = "rollout-";
|
||||
|
|
@ -125,7 +127,7 @@ pub(crate) async fn backfill_sessions(
|
|||
runtime: &codex_state::StateRuntime,
|
||||
config: &Config,
|
||||
otel: Option<&OtelManager>,
|
||||
) -> BackfillStats {
|
||||
) {
|
||||
let sessions_root = config.codex_home.join(rollout::SESSIONS_SUBDIR);
|
||||
let archived_root = config.codex_home.join(rollout::ARCHIVED_SESSIONS_SUBDIR);
|
||||
let mut rollout_paths: Vec<(PathBuf, bool)> = Vec::new();
|
||||
|
|
@ -191,7 +193,23 @@ pub(crate) async fn backfill_sessions(
|
|||
}
|
||||
}
|
||||
}
|
||||
stats
|
||||
|
||||
info!(
|
||||
"state db backfill scanned={}, upserted={}, failed={}",
|
||||
stats.scanned, stats.upserted, stats.failed
|
||||
);
|
||||
if let Some(otel) = otel {
|
||||
otel.counter(
|
||||
DB_METRIC_BACKFILL,
|
||||
stats.upserted as i64,
|
||||
&[("status", "upserted")],
|
||||
);
|
||||
otel.counter(
|
||||
DB_METRIC_BACKFILL,
|
||||
stats.failed as i64,
|
||||
&[("status", "failed")],
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn file_modified_time_utc(path: &Path) -> Option<DateTime<Utc>> {
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ use codex_otel::OtelManager;
|
|||
use codex_protocol::ThreadId;
|
||||
use codex_protocol::protocol::RolloutItem;
|
||||
use codex_protocol::protocol::SessionSource;
|
||||
use codex_state::DB_METRIC_BACKFILL;
|
||||
pub use codex_state::LogEntry;
|
||||
use codex_state::STATE_DB_FILENAME;
|
||||
use codex_state::ThreadMetadataBuilder;
|
||||
|
|
@ -19,15 +18,18 @@ use serde_json::Value;
|
|||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use tracing::info;
|
||||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Core-facing handle to the optional SQLite-backed state runtime.
|
||||
pub type StateDbHandle = Arc<codex_state::StateRuntime>;
|
||||
|
||||
/// Initialize the state runtime when the `sqlite` feature flag is enabled.
|
||||
pub async fn init_if_enabled(config: &Config, otel: Option<&OtelManager>) -> Option<StateDbHandle> {
|
||||
/// Initialize the state runtime when the `sqlite` feature flag is enabled. To only be used
|
||||
/// inside `core`. The initialization should not be done anywhere else.
|
||||
pub(crate) async fn init_if_enabled(
|
||||
config: &Config,
|
||||
otel: Option<&OtelManager>,
|
||||
) -> Option<StateDbHandle> {
|
||||
let state_path = config.codex_home.join(STATE_DB_FILENAME);
|
||||
if !config.features.enabled(Feature::Sqlite) {
|
||||
// We delete the file on best effort basis to maintain retro-compatibility in the future.
|
||||
|
|
@ -59,27 +61,38 @@ pub async fn init_if_enabled(config: &Config, otel: Option<&OtelManager>) -> Opt
|
|||
}
|
||||
};
|
||||
if !existed {
|
||||
let stats = metadata::backfill_sessions(runtime.as_ref(), config, otel).await;
|
||||
info!(
|
||||
"state db backfill scanned={}, upserted={}, failed={}",
|
||||
stats.scanned, stats.upserted, stats.failed
|
||||
);
|
||||
if let Some(otel) = otel {
|
||||
otel.counter(
|
||||
DB_METRIC_BACKFILL,
|
||||
stats.upserted as i64,
|
||||
&[("status", "upserted")],
|
||||
);
|
||||
otel.counter(
|
||||
DB_METRIC_BACKFILL,
|
||||
stats.failed as i64,
|
||||
&[("status", "failed")],
|
||||
);
|
||||
}
|
||||
let runtime_for_backfill = Arc::clone(&runtime);
|
||||
let config_for_backfill = config.clone();
|
||||
let otel_for_backfill = otel.cloned();
|
||||
tokio::task::spawn(async move {
|
||||
metadata::backfill_sessions(
|
||||
runtime_for_backfill.as_ref(),
|
||||
&config_for_backfill,
|
||||
otel_for_backfill.as_ref(),
|
||||
)
|
||||
.await;
|
||||
});
|
||||
}
|
||||
Some(runtime)
|
||||
}
|
||||
|
||||
/// Get the DB if the feature is enabled and the DB exists.
|
||||
pub async fn get_state_db(config: &Config, otel: Option<&OtelManager>) -> Option<StateDbHandle> {
|
||||
let state_path = config.codex_home.join(STATE_DB_FILENAME);
|
||||
if !config.features.enabled(Feature::Sqlite)
|
||||
|| !tokio::fs::try_exists(&state_path).await.unwrap_or(false)
|
||||
{
|
||||
return None;
|
||||
}
|
||||
codex_state::StateRuntime::init(
|
||||
config.codex_home.clone(),
|
||||
config.model_provider_id.clone(),
|
||||
otel.cloned(),
|
||||
)
|
||||
.await
|
||||
.ok()
|
||||
}
|
||||
|
||||
/// Open the state runtime when the SQLite file exists, without feature gating.
|
||||
///
|
||||
/// This is used for parity checks during the SQLite migration phase.
|
||||
|
|
|
|||
|
|
@ -352,7 +352,7 @@ pub async fn run_main(
|
|||
|
||||
let otel_tracing_layer = otel.as_ref().and_then(|o| o.tracing_layer());
|
||||
|
||||
let log_db_layer = codex_core::state_db::init_if_enabled(&config, None)
|
||||
let log_db_layer = codex_core::state_db::get_state_db(&config, None)
|
||||
.await
|
||||
.map(|db| log_db::start(db).with_filter(env_filter()));
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue