feat: limit number of rows per log (#13763)
avoid DB explosion. This is a temp solution
This commit is contained in:
parent
f891f516a5
commit
c8f4b5bc1e
3 changed files with 170 additions and 12 deletions
|
|
@ -41,8 +41,8 @@ use crate::LogEntry;
|
|||
use crate::StateRuntime;
|
||||
|
||||
const LOG_QUEUE_CAPACITY: usize = 512;
|
||||
const LOG_BATCH_SIZE: usize = 64;
|
||||
const LOG_FLUSH_INTERVAL: Duration = Duration::from_millis(250);
|
||||
const LOG_BATCH_SIZE: usize = 128;
|
||||
const LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(2);
|
||||
const LOG_RETENTION_DAYS: i64 = 90;
|
||||
|
||||
pub struct LogDbLayer {
|
||||
|
|
|
|||
|
|
@ -61,6 +61,7 @@ mod threads;
|
|||
// - one bucket per threadless (thread_id IS NULL) non-null process_uuid
|
||||
// - one bucket for threadless rows with process_uuid IS NULL
|
||||
const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024;
|
||||
const LOG_PARTITION_ROW_LIMIT: i64 = 1_000;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct StateRuntime {
|
||||
|
|
|
|||
|
|
@ -76,6 +76,8 @@ impl StateRuntime {
|
|||
over_limit_threads_query.push("estimated_bytes");
|
||||
over_limit_threads_query.push(") > ");
|
||||
over_limit_threads_query.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
over_limit_threads_query.push(" OR COUNT(*) > ");
|
||||
over_limit_threads_query.push_bind(LOG_PARTITION_ROW_LIMIT);
|
||||
let over_limit_thread_ids: Vec<String> = over_limit_threads_query
|
||||
.build()
|
||||
.fetch_all(&mut *tx)
|
||||
|
|
@ -103,7 +105,11 @@ WHERE id IN (
|
|||
) OVER (
|
||||
PARTITION BY thread_id
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS cumulative_bytes
|
||||
) AS cumulative_bytes,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY thread_id
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS row_number
|
||||
FROM logs
|
||||
WHERE thread_id IN (
|
||||
"#,
|
||||
|
|
@ -122,6 +128,8 @@ WHERE id IN (
|
|||
"#,
|
||||
);
|
||||
prune_threads.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
prune_threads.push(" OR row_number > ");
|
||||
prune_threads.push_bind(LOG_PARTITION_ROW_LIMIT);
|
||||
prune_threads.push("\n)");
|
||||
prune_threads.build().execute(&mut *tx).await?;
|
||||
}
|
||||
|
|
@ -150,6 +158,8 @@ WHERE id IN (
|
|||
over_limit_processes_query.push("estimated_bytes");
|
||||
over_limit_processes_query.push(") > ");
|
||||
over_limit_processes_query.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
over_limit_processes_query.push(" OR COUNT(*) > ");
|
||||
over_limit_processes_query.push_bind(LOG_PARTITION_ROW_LIMIT);
|
||||
let over_limit_process_uuids: Vec<String> = over_limit_processes_query
|
||||
.build()
|
||||
.fetch_all(&mut *tx)
|
||||
|
|
@ -177,7 +187,11 @@ WHERE id IN (
|
|||
) OVER (
|
||||
PARTITION BY process_uuid
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS cumulative_bytes
|
||||
) AS cumulative_bytes,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY process_uuid
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS row_number
|
||||
FROM logs
|
||||
WHERE thread_id IS NULL
|
||||
AND process_uuid IN (
|
||||
|
|
@ -197,6 +211,8 @@ WHERE id IN (
|
|||
"#,
|
||||
);
|
||||
prune_threadless_process_logs.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
prune_threadless_process_logs.push(" OR row_number > ");
|
||||
prune_threadless_process_logs.push_bind(LOG_PARTITION_ROW_LIMIT);
|
||||
prune_threadless_process_logs.push("\n)");
|
||||
prune_threadless_process_logs
|
||||
.build()
|
||||
|
|
@ -210,15 +226,16 @@ WHERE id IN (
|
|||
let mut null_process_usage_query = QueryBuilder::<Sqlite>::new("SELECT SUM(");
|
||||
null_process_usage_query.push("estimated_bytes");
|
||||
null_process_usage_query.push(
|
||||
") AS total_bytes FROM logs WHERE thread_id IS NULL AND process_uuid IS NULL",
|
||||
") AS total_bytes, COUNT(*) AS row_count FROM logs WHERE thread_id IS NULL AND process_uuid IS NULL",
|
||||
);
|
||||
let total_null_process_bytes: Option<i64> = null_process_usage_query
|
||||
.build()
|
||||
.fetch_one(&mut *tx)
|
||||
.await?
|
||||
.try_get("total_bytes")?;
|
||||
let null_process_usage = null_process_usage_query.build().fetch_one(&mut *tx).await?;
|
||||
let total_null_process_bytes: Option<i64> =
|
||||
null_process_usage.try_get("total_bytes")?;
|
||||
let null_process_row_count: i64 = null_process_usage.try_get("row_count")?;
|
||||
|
||||
if total_null_process_bytes.unwrap_or(0) > LOG_PARTITION_SIZE_LIMIT_BYTES {
|
||||
if total_null_process_bytes.unwrap_or(0) > LOG_PARTITION_SIZE_LIMIT_BYTES
|
||||
|| null_process_row_count > LOG_PARTITION_ROW_LIMIT
|
||||
{
|
||||
let mut prune_threadless_null_process_logs = QueryBuilder::<Sqlite>::new(
|
||||
r#"
|
||||
DELETE FROM logs
|
||||
|
|
@ -236,7 +253,11 @@ WHERE id IN (
|
|||
) OVER (
|
||||
PARTITION BY process_uuid
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS cumulative_bytes
|
||||
) AS cumulative_bytes,
|
||||
ROW_NUMBER() OVER (
|
||||
PARTITION BY process_uuid
|
||||
ORDER BY ts DESC, ts_nanos DESC, id DESC
|
||||
) AS row_number
|
||||
FROM logs
|
||||
WHERE thread_id IS NULL
|
||||
AND process_uuid IS NULL
|
||||
|
|
@ -245,6 +266,8 @@ WHERE id IN (
|
|||
"#,
|
||||
);
|
||||
prune_threadless_null_process_logs.push_bind(LOG_PARTITION_SIZE_LIMIT_BYTES);
|
||||
prune_threadless_null_process_logs.push(" OR row_number > ");
|
||||
prune_threadless_null_process_logs.push_bind(LOG_PARTITION_ROW_LIMIT);
|
||||
prune_threadless_null_process_logs.push("\n)");
|
||||
prune_threadless_null_process_logs
|
||||
.build()
|
||||
|
|
@ -788,6 +811,140 @@ mod tests {
|
|||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_old_rows_when_thread_exceeds_row_limit() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let entries: Vec<LogEntry> = (1..=1_001)
|
||||
.map(|ts| LogEntry {
|
||||
ts,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(format!("thread-row-{ts}")),
|
||||
thread_id: Some("thread-row-limit".to_string()),
|
||||
process_uuid: Some("proc-1".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(ts),
|
||||
module_path: Some("mod".to_string()),
|
||||
})
|
||||
.collect();
|
||||
runtime
|
||||
.insert_logs(&entries)
|
||||
.await
|
||||
.expect("insert test logs");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
thread_ids: vec!["thread-row-limit".to_string()],
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query thread logs");
|
||||
|
||||
let timestamps: Vec<i64> = rows.into_iter().map(|row| row.ts).collect();
|
||||
assert_eq!(timestamps.len(), 1_000);
|
||||
assert_eq!(timestamps.first().copied(), Some(2));
|
||||
assert_eq!(timestamps.last().copied(), Some(1_001));
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_old_threadless_rows_when_process_exceeds_row_limit() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let entries: Vec<LogEntry> = (1..=1_001)
|
||||
.map(|ts| LogEntry {
|
||||
ts,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(format!("process-row-{ts}")),
|
||||
thread_id: None,
|
||||
process_uuid: Some("proc-row-limit".to_string()),
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(ts),
|
||||
module_path: Some("mod".to_string()),
|
||||
})
|
||||
.collect();
|
||||
runtime
|
||||
.insert_logs(&entries)
|
||||
.await
|
||||
.expect("insert test logs");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
include_threadless: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query threadless logs");
|
||||
|
||||
let timestamps: Vec<i64> = rows
|
||||
.into_iter()
|
||||
.filter(|row| row.process_uuid.as_deref() == Some("proc-row-limit"))
|
||||
.map(|row| row.ts)
|
||||
.collect();
|
||||
assert_eq!(timestamps.len(), 1_000);
|
||||
assert_eq!(timestamps.first().copied(), Some(2));
|
||||
assert_eq!(timestamps.last().copied(), Some(1_001));
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn insert_logs_prunes_old_threadless_null_process_rows_when_row_limit_exceeded() {
|
||||
let codex_home = unique_temp_dir();
|
||||
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string(), None)
|
||||
.await
|
||||
.expect("initialize runtime");
|
||||
|
||||
let entries: Vec<LogEntry> = (1..=1_001)
|
||||
.map(|ts| LogEntry {
|
||||
ts,
|
||||
ts_nanos: 0,
|
||||
level: "INFO".to_string(),
|
||||
target: "cli".to_string(),
|
||||
message: Some(format!("null-process-row-{ts}")),
|
||||
thread_id: None,
|
||||
process_uuid: None,
|
||||
file: Some("main.rs".to_string()),
|
||||
line: Some(ts),
|
||||
module_path: Some("mod".to_string()),
|
||||
})
|
||||
.collect();
|
||||
runtime
|
||||
.insert_logs(&entries)
|
||||
.await
|
||||
.expect("insert test logs");
|
||||
|
||||
let rows = runtime
|
||||
.query_logs(&LogQuery {
|
||||
include_threadless: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.expect("query threadless logs");
|
||||
|
||||
let timestamps: Vec<i64> = rows
|
||||
.into_iter()
|
||||
.filter(|row| row.process_uuid.is_none())
|
||||
.map(|row| row.ts)
|
||||
.collect();
|
||||
assert_eq!(timestamps.len(), 1_000);
|
||||
assert_eq!(timestamps.first().copied(), Some(2));
|
||||
assert_eq!(timestamps.last().copied(), Some(1_001));
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(codex_home).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn query_feedback_logs_returns_newest_lines_within_limit_in_order() {
|
||||
let codex_home = unique_temp_dir();
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue