feat: add log retention and delete them after 90 days (#10151)
This commit is contained in:
parent
89c5f3c4d4
commit
d6631fb5a9
2 changed files with 21 additions and 1 deletions
|
|
@ -18,6 +18,8 @@
|
|||
//! # }
|
||||
//! ```
|
||||
|
||||
use chrono::Duration as ChronoDuration;
|
||||
use chrono::Utc;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
|
@ -38,6 +40,7 @@ use crate::StateRuntime;
|
|||
const LOG_QUEUE_CAPACITY: usize = 512;
|
||||
const LOG_BATCH_SIZE: usize = 64;
|
||||
const LOG_FLUSH_INTERVAL: Duration = Duration::from_millis(250);
|
||||
const LOG_RETENTION_DAYS: i64 = 90;
|
||||
|
||||
pub struct LogDbLayer {
|
||||
sender: mpsc::Sender<LogEntry>,
|
||||
|
|
@ -45,7 +48,8 @@ pub struct LogDbLayer {
|
|||
|
||||
pub fn start(state_db: std::sync::Arc<StateRuntime>) -> LogDbLayer {
|
||||
let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY);
|
||||
tokio::spawn(run_inserter(state_db, receiver));
|
||||
tokio::spawn(run_inserter(std::sync::Arc::clone(&state_db), receiver));
|
||||
tokio::spawn(run_retention_cleanup(state_db));
|
||||
|
||||
LogDbLayer { sender }
|
||||
}
|
||||
|
|
@ -229,6 +233,14 @@ async fn flush(state_db: &std::sync::Arc<StateRuntime>, buffer: &mut Vec<LogEntr
|
|||
let _ = state_db.insert_logs(entries.as_slice()).await;
|
||||
}
|
||||
|
||||
async fn run_retention_cleanup(state_db: std::sync::Arc<StateRuntime>) {
|
||||
let Some(cutoff) = Utc::now().checked_sub_signed(ChronoDuration::days(LOG_RETENTION_DAYS))
|
||||
else {
|
||||
return;
|
||||
};
|
||||
let _ = state_db.delete_logs_before(cutoff.timestamp()).await;
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct MessageVisitor {
|
||||
message: Option<String>,
|
||||
|
|
|
|||
|
|
@ -231,6 +231,14 @@ FROM threads
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn delete_logs_before(&self, cutoff_ts: i64) -> anyhow::Result<u64> {
|
||||
let result = sqlx::query("DELETE FROM logs WHERE ts < ?")
|
||||
.bind(cutoff_ts)
|
||||
.execute(self.pool.as_ref())
|
||||
.await?;
|
||||
Ok(result.rows_affected())
|
||||
}
|
||||
|
||||
/// List thread ids using the underlying database (no rollout scanning).
|
||||
pub async fn list_thread_ids(
|
||||
&self,
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue