diff --git a/codex-rs/state/src/log_db.rs b/codex-rs/state/src/log_db.rs index 387dc9f4b..4fe9be184 100644 --- a/codex-rs/state/src/log_db.rs +++ b/codex-rs/state/src/log_db.rs @@ -41,8 +41,8 @@ use crate::LogEntry; use crate::StateRuntime; const LOG_QUEUE_CAPACITY: usize = 512; -const LOG_BATCH_SIZE: usize = 64; -const LOG_FLUSH_INTERVAL: Duration = Duration::from_millis(250); +const LOG_BATCH_SIZE: usize = 128; +const LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(2); const LOG_RETENTION_DAYS: i64 = 90; pub struct LogDbLayer { diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 42d235fc7..0be70a5b4 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -61,6 +61,7 @@ mod threads; // - one bucket per threadless (thread_id IS NULL) non-null process_uuid // - one bucket for threadless rows with process_uuid IS NULL const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024; +const LOG_PARTITION_ROW_LIMIT: i64 = 1_000; #[derive(Clone)] pub struct StateRuntime { diff --git a/codex-rs/state/src/runtime/logs.rs b/codex-rs/state/src/runtime/logs.rs index 0a6798f11..88cbc22d5 100644 --- a/codex-rs/state/src/runtime/logs.rs +++ b/codex-rs/state/src/runtime/logs.rs @@ -76,6 +76,8 @@ impl StateRuntime { over_limit_threads_query.push("estimated_bytes"); over_limit_threads_query.push(") > "); over_limit_threads_query.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES); + over_limit_threads_query.push(" OR COUNT(*) > "); + over_limit_threads_query.push_bind(LOG_PARTITION_ROW_LIMIT); let over_limit_thread_ids: Vec = over_limit_threads_query .build() .fetch_all(&mut *tx) @@ -103,7 +105,11 @@ WHERE id IN ( ) OVER ( PARTITION BY thread_id ORDER BY ts DESC, ts_nanos DESC, id DESC - ) AS cumulative_bytes + ) AS cumulative_bytes, + ROW_NUMBER() OVER ( + PARTITION BY thread_id + ORDER BY ts DESC, ts_nanos DESC, id DESC + ) AS row_number FROM logs WHERE thread_id IN ( "#, @@ -122,6 +128,8 @@ WHERE id IN ( "#, ); prune_threads.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES); + prune_threads.push(" OR row_number > "); + prune_threads.push_bind(LOG_PARTITION_ROW_LIMIT); prune_threads.push("\n)"); prune_threads.build().execute(&mut *tx).await?; } @@ -150,6 +158,8 @@ WHERE id IN ( over_limit_processes_query.push("estimated_bytes"); over_limit_processes_query.push(") > "); over_limit_processes_query.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES); + over_limit_processes_query.push(" OR COUNT(*) > "); + over_limit_processes_query.push_bind(LOG_PARTITION_ROW_LIMIT); let over_limit_process_uuids: Vec = over_limit_processes_query .build() .fetch_all(&mut *tx) @@ -177,7 +187,11 @@ WHERE id IN ( ) OVER ( PARTITION BY process_uuid ORDER BY ts DESC, ts_nanos DESC, id DESC - ) AS cumulative_bytes + ) AS cumulative_bytes, + ROW_NUMBER() OVER ( + PARTITION BY process_uuid + ORDER BY ts DESC, ts_nanos DESC, id DESC + ) AS row_number FROM logs WHERE thread_id IS NULL AND process_uuid IN ( @@ -197,6 +211,8 @@ WHERE id IN ( "#, ); prune_threadless_process_logs.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES); + prune_threadless_process_logs.push(" OR row_number > "); + prune_threadless_process_logs.push_bind(LOG_PARTITION_ROW_LIMIT); prune_threadless_process_logs.push("\n)"); prune_threadless_process_logs .build() @@ -210,15 +226,16 @@ WHERE id IN ( let mut null_process_usage_query = QueryBuilder::::new("SELECT SUM("); null_process_usage_query.push("estimated_bytes"); null_process_usage_query.push( - ") AS total_bytes FROM logs WHERE thread_id IS NULL AND process_uuid IS NULL", + ") AS total_bytes, COUNT(*) AS row_count FROM logs WHERE thread_id IS NULL AND process_uuid IS NULL", ); - let total_null_process_bytes: Option = null_process_usage_query - .build() - .fetch_one(&mut *tx) - .await? - .try_get("total_bytes")?; + let null_process_usage = null_process_usage_query.build().fetch_one(&mut *tx).await?; + let total_null_process_bytes: Option = + null_process_usage.try_get("total_bytes")?; + let null_process_row_count: i64 = null_process_usage.try_get("row_count")?; - if total_null_process_bytes.unwrap_or(0) > LOG_PARTITION_SIZE_LIMIT_BYTES { + if total_null_process_bytes.unwrap_or(0) > LOG_PARTITION_SIZE_LIMIT_BYTES + || null_process_row_count > LOG_PARTITION_ROW_LIMIT + { let mut prune_threadless_null_process_logs = QueryBuilder::::new( r#" DELETE FROM logs @@ -236,7 +253,11 @@ WHERE id IN ( ) OVER ( PARTITION BY process_uuid ORDER BY ts DESC, ts_nanos DESC, id DESC - ) AS cumulative_bytes + ) AS cumulative_bytes, + ROW_NUMBER() OVER ( + PARTITION BY process_uuid + ORDER BY ts DESC, ts_nanos DESC, id DESC + ) AS row_number FROM logs WHERE thread_id IS NULL AND process_uuid IS NULL @@ -245,6 +266,8 @@ WHERE id IN ( "#, ); prune_threadless_null_process_logs.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES); + prune_threadless_null_process_logs.push(" OR row_number > "); + prune_threadless_null_process_logs.push_bind(LOG_PARTITION_ROW_LIMIT); prune_threadless_null_process_logs.push("\n)"); prune_threadless_null_process_logs .build() @@ -788,6 +811,140 @@ mod tests { let _ = tokio::fs::remove_dir_all(codex_home).await; } + #[tokio::test] + async fn insert_logs_prunes_old_rows_when_thread_exceeds_row_limit() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let entries: Vec = (1..=1_001) + .map(|ts| LogEntry { + ts, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(format!("thread-row-{ts}")), + thread_id: Some("thread-row-limit".to_string()), + process_uuid: Some("proc-1".to_string()), + file: Some("main.rs".to_string()), + line: Some(ts), + module_path: Some("mod".to_string()), + }) + .collect(); + runtime + .insert_logs(&entries) + .await + .expect("insert test logs"); + + let rows = runtime + .query_logs(&LogQuery { + thread_ids: vec!["thread-row-limit".to_string()], + ..Default::default() + }) + .await + .expect("query thread logs"); + + let timestamps: Vec = rows.into_iter().map(|row| row.ts).collect(); + assert_eq!(timestamps.len(), 1_000); + assert_eq!(timestamps.first().copied(), Some(2)); + assert_eq!(timestamps.last().copied(), Some(1_001)); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn insert_logs_prunes_old_threadless_rows_when_process_exceeds_row_limit() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let entries: Vec = (1..=1_001) + .map(|ts| LogEntry { + ts, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(format!("process-row-{ts}")), + thread_id: None, + process_uuid: Some("proc-row-limit".to_string()), + file: Some("main.rs".to_string()), + line: Some(ts), + module_path: Some("mod".to_string()), + }) + .collect(); + runtime + .insert_logs(&entries) + .await + .expect("insert test logs"); + + let rows = runtime + .query_logs(&LogQuery { + include_threadless: true, + ..Default::default() + }) + .await + .expect("query threadless logs"); + + let timestamps: Vec = rows + .into_iter() + .filter(|row| row.process_uuid.as_deref() == Some("proc-row-limit")) + .map(|row| row.ts) + .collect(); + assert_eq!(timestamps.len(), 1_000); + assert_eq!(timestamps.first().copied(), Some(2)); + assert_eq!(timestamps.last().copied(), Some(1_001)); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[tokio::test] + async fn insert_logs_prunes_old_threadless_null_process_rows_when_row_limit_exceeded() { + let codex_home = unique_temp_dir(); + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None) + .await + .expect("initialize runtime"); + + let entries: Vec = (1..=1_001) + .map(|ts| LogEntry { + ts, + ts_nanos: 0, + level: "INFO".to_string(), + target: "cli".to_string(), + message: Some(format!("null-process-row-{ts}")), + thread_id: None, + process_uuid: None, + file: Some("main.rs".to_string()), + line: Some(ts), + module_path: Some("mod".to_string()), + }) + .collect(); + runtime + .insert_logs(&entries) + .await + .expect("insert test logs"); + + let rows = runtime + .query_logs(&LogQuery { + include_threadless: true, + ..Default::default() + }) + .await + .expect("query threadless logs"); + + let timestamps: Vec = rows + .into_iter() + .filter(|row| row.process_uuid.is_none()) + .map(|row| row.ts) + .collect(); + assert_eq!(timestamps.len(), 1_000); + assert_eq!(timestamps.first().copied(), Some(2)); + assert_eq!(timestamps.last().copied(), Some(1_001)); + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + #[tokio::test] async fn query_feedback_logs_returns_newest_lines_within_limit_in_order() { let codex_home = unique_temp_dir();