From d6631fb5a99087dd0fb7c37620e1f32977cc5c7e Mon Sep 17 00:00:00 2001 From: jif-oai Date: Thu, 29 Jan 2026 16:55:01 +0100 Subject: [PATCH] feat: add log retention and delete them after 90 days (#10151) --- codex-rs/state/src/log_db.rs | 14 +++++++++++++- codex-rs/state/src/runtime.rs | 8 ++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/codex-rs/state/src/log_db.rs b/codex-rs/state/src/log_db.rs index a62870974..345e90c6a 100644 --- a/codex-rs/state/src/log_db.rs +++ b/codex-rs/state/src/log_db.rs @@ -18,6 +18,8 @@ //! # } //! ``` +use chrono::Duration as ChronoDuration; +use chrono::Utc; use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -38,6 +40,7 @@ use crate::StateRuntime; const LOG_QUEUE_CAPACITY: usize = 512; const LOG_BATCH_SIZE: usize = 64; const LOG_FLUSH_INTERVAL: Duration = Duration::from_millis(250); +const LOG_RETENTION_DAYS: i64 = 90; pub struct LogDbLayer { sender: mpsc::Sender, @@ -45,7 +48,8 @@ pub struct LogDbLayer { pub fn start(state_db: std::sync::Arc) -> LogDbLayer { let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY); - tokio::spawn(run_inserter(state_db, receiver)); + tokio::spawn(run_inserter(std::sync::Arc::clone(&state_db), receiver)); + tokio::spawn(run_retention_cleanup(state_db)); LogDbLayer { sender } } @@ -229,6 +233,14 @@ async fn flush(state_db: &std::sync::Arc, buffer: &mut Vec) { + let Some(cutoff) = Utc::now().checked_sub_signed(ChronoDuration::days(LOG_RETENTION_DAYS)) + else { + return; + }; + let _ = state_db.delete_logs_before(cutoff.timestamp()).await; +} + #[derive(Default)] struct MessageVisitor { message: Option, diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 8dada3274..b61ea315c 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -231,6 +231,14 @@ FROM threads Ok(()) } + pub(crate) async fn delete_logs_before(&self, cutoff_ts: i64) -> anyhow::Result { + let result = sqlx::query("DELETE FROM logs WHERE ts < ?") + .bind(cutoff_ts) + .execute(self.pool.as_ref()) + .await?; + Ok(result.rows_affected()) + } + /// List thread ids using the underlying database (no rollout scanning). pub async fn list_thread_ids( &self,