From ebbbc52ce40324d6f47745fe6edf41f3a1cfbe48 Mon Sep 17 00:00:00 2001 From: Charley Cunningham Date: Wed, 18 Mar 2026 15:44:31 -0700 Subject: [PATCH] Align SQLite feedback logs with feedback formatter (#13494) ## Summary - store a pre-rendered `feedback_log_body` in SQLite so `/feedback` exports keep span prefixes and structured event fields - render SQLite feedback exports with timestamps and level prefixes to match the old in-memory feedback formatter, while preserving existing trailing newlines - count `feedback_log_body` in the SQLite retention budget so structured or span-prefixed rows still prune correctly - bound `/feedback` row loading in SQL with the retention estimate, then apply exact whole-line truncation in Rust so uploads stay capped without splitting lines ## Details - add a `feedback_log_body` column to `logs` and backfill it from `message` for existing rows - capture span names plus formatted span and event fields at write time, since SQLite does not retain enough structure to reconstruct the old formatter later - keep SQLite feedback queries scoped to the requested thread plus same-process threadless rows - restore a SQL-side cumulative `estimated_bytes` cap for feedback export queries so over-retained partitions do not load every matching row before truncation - add focused formatting coverage for exported feedback lines and parity coverage against `tracing_subscriber` ## Testing - cargo test -p codex-state - just fix -p codex-state - just fmt codex author: `codex resume 019ca1b0-0ecc-78b1-85eb-6befdd7e4f1f` --------- Co-authored-by: Codex --- codex-rs/core/tests/suite/sqlite_state.rs | 4 +- .../0002_logs_feedback_log_body.sql | 53 +++ codex-rs/state/src/bin/logs_client.rs | 6 +- codex-rs/state/src/log_db.rs | 103 ++++-- codex-rs/state/src/model/log.rs | 1 + codex-rs/state/src/runtime.rs | 4 +- codex-rs/state/src/runtime/logs.rs | 305 ++++++++++++++---- 7 files changed, 386 insertions(+), 90 deletions(-) create mode 100644 codex-rs/state/logs_migrations/0002_logs_feedback_log_body.sql diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index 0252f3e08..620b9b508 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -482,7 +482,7 @@ async fn tool_call_logs_include_thread_id() -> Result<()> { if let Some(row) = rows.into_iter().find(|row| { row.message .as_deref() - .is_some_and(|m| m.starts_with("ToolCall:")) + .is_some_and(|m| m.contains("ToolCall:")) }) { let thread_id = row.thread_id; let message = row.message; @@ -497,7 +497,7 @@ async fn tool_call_logs_include_thread_id() -> Result<()> { assert!( message .as_deref() - .is_some_and(|text| text.starts_with("ToolCall:")), + .is_some_and(|text| text.contains("ToolCall:")), "expected ToolCall message, got {message:?}" ); diff --git a/codex-rs/state/logs_migrations/0002_logs_feedback_log_body.sql b/codex-rs/state/logs_migrations/0002_logs_feedback_log_body.sql new file mode 100644 index 000000000..6cd38664e --- /dev/null +++ b/codex-rs/state/logs_migrations/0002_logs_feedback_log_body.sql @@ -0,0 +1,53 @@ +ALTER TABLE logs RENAME TO logs_old; + +CREATE TABLE logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + ts INTEGER NOT NULL, + ts_nanos INTEGER NOT NULL, + level TEXT NOT NULL, + target TEXT NOT NULL, + feedback_log_body TEXT, + module_path TEXT, + file TEXT, + line INTEGER, + thread_id TEXT, + process_uuid TEXT, + estimated_bytes INTEGER NOT NULL DEFAULT 0 +); + +INSERT INTO logs ( + id, + ts, + ts_nanos, + level, + target, + feedback_log_body, + module_path, + file, + line, + thread_id, + process_uuid, + estimated_bytes +) +SELECT + id, + ts, + ts_nanos, + level, + target, + message, + module_path, + file, + line, + thread_id, + process_uuid, + estimated_bytes +FROM logs_old; + +DROP TABLE logs_old; + +CREATE INDEX idx_logs_ts ON logs(ts DESC, ts_nanos DESC, id DESC); +CREATE INDEX idx_logs_thread_id ON logs(thread_id); +CREATE INDEX idx_logs_thread_id_ts ON logs(thread_id, ts DESC, ts_nanos DESC, id DESC); +CREATE INDEX idx_logs_process_uuid_threadless_ts ON logs(process_uuid, ts DESC, ts_nanos DESC, id DESC) +WHERE thread_id IS NULL; diff --git a/codex-rs/state/src/bin/logs_client.rs b/codex-rs/state/src/bin/logs_client.rs index 66fc25895..6bfed4b23 100644 --- a/codex-rs/state/src/bin/logs_client.rs +++ b/codex-rs/state/src/bin/logs_client.rs @@ -46,7 +46,7 @@ struct Args { #[arg(long = "thread-id")] thread_id: Vec, - /// Substring match against the log message. + /// Substring match against the rendered log body. #[arg(long)] search: Option, @@ -62,7 +62,7 @@ struct Args { #[arg(long, default_value_t = 500)] poll_ms: u64, - /// Show compact output with only time, level, and message. + /// Show compact output with only time, level, and rendered log body. #[arg(long)] compact: bool, } @@ -295,7 +295,7 @@ fn heuristic_formatting(message: &str) -> String { mod matcher { pub(super) fn apply_patch(message: &str) -> bool { - message.starts_with("ToolCall: apply_patch") + message.contains("ToolCall: apply_patch") } } diff --git a/codex-rs/state/src/log_db.rs b/codex-rs/state/src/log_db.rs index e533d3c89..8ec421665 100644 --- a/codex-rs/state/src/log_db.rs +++ b/codex-rs/state/src/log_db.rs @@ -34,6 +34,10 @@ use tracing::span::Attributes; use tracing::span::Id; use tracing::span::Record; use tracing_subscriber::Layer; +use tracing_subscriber::field::RecordFields; +use tracing_subscriber::fmt::FormatFields; +use tracing_subscriber::fmt::FormattedFields; +use tracing_subscriber::fmt::format::DefaultFields; use tracing_subscriber::registry::LookupSpan; use uuid::Uuid; @@ -95,6 +99,8 @@ where if let Some(span) = ctx.span(id) { span.extensions_mut().insert(SpanLogContext { + name: span.metadata().name().to_string(), + formatted_fields: format_fields(attrs), thread_id: visitor.thread_id, }); } @@ -109,16 +115,17 @@ where let mut visitor = SpanFieldVisitor::default(); values.record(&mut visitor); - if visitor.thread_id.is_none() { - return; - } - if let Some(span) = ctx.span(id) { let mut extensions = span.extensions_mut(); if let Some(log_context) = extensions.get_mut::() { - log_context.thread_id = visitor.thread_id; + if let Some(thread_id) = visitor.thread_id { + log_context.thread_id = Some(thread_id); + } + append_fields(&mut log_context.formatted_fields, values); } else { extensions.insert(SpanLogContext { + name: span.metadata().name().to_string(), + formatted_fields: format_fields(values), thread_id: visitor.thread_id, }); } @@ -133,6 +140,7 @@ where .thread_id .clone() .or_else(|| event_thread_id(event, &ctx)); + let feedback_log_body = format_feedback_log_body(event, &ctx); let now = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -143,6 +151,7 @@ where level: metadata.level().as_str().to_string(), target: metadata.target().to_string(), message: visitor.message, + feedback_log_body: Some(feedback_log_body), thread_id, process_uuid: Some(self.process_uuid.clone()), module_path: metadata.module_path().map(ToString::to_string), @@ -150,17 +159,19 @@ where line: metadata.line().map(|line| line as i64), }; - let _ = self.sender.try_send(LogDbCommand::Entry(entry)); + let _ = self.sender.try_send(LogDbCommand::Entry(Box::new(entry))); } } enum LogDbCommand { - Entry(LogEntry), + Entry(Box), Flush(oneshot::Sender<()>), } -#[derive(Clone, Debug, Default)] +#[derive(Debug)] struct SpanLogContext { + name: String, + formatted_fields: String, thread_id: Option, } @@ -228,6 +239,54 @@ where thread_id } +fn format_feedback_log_body( + event: &Event<'_>, + ctx: &tracing_subscriber::layer::Context<'_, S>, +) -> String +where + S: tracing::Subscriber + for<'a> LookupSpan<'a>, +{ + let mut feedback_log_body = String::new(); + if let Some(scope) = ctx.event_scope(event) { + for span in scope.from_root() { + let extensions = span.extensions(); + if let Some(log_context) = extensions.get::() { + feedback_log_body.push_str(&log_context.name); + if !log_context.formatted_fields.is_empty() { + feedback_log_body.push('{'); + feedback_log_body.push_str(&log_context.formatted_fields); + feedback_log_body.push('}'); + } + } else { + feedback_log_body.push_str(span.metadata().name()); + } + feedback_log_body.push(':'); + } + if !feedback_log_body.is_empty() { + feedback_log_body.push(' '); + } + } + feedback_log_body.push_str(&format_fields(event)); + feedback_log_body +} + +fn format_fields(fields: R) -> String +where + R: RecordFields, +{ + let formatter = DefaultFields::default(); + let mut formatted = FormattedFields::::new(String::new()); + let _ = formatter.format_fields(formatted.as_writer(), fields); + formatted.fields +} + +fn append_fields(fields: &mut String, values: &Record<'_>) { + let formatter = DefaultFields::default(); + let mut formatted = FormattedFields::::new(std::mem::take(fields)); + let _ = formatter.add_fields(&mut formatted, values); + *fields = formatted.fields; +} + fn current_process_log_uuid() -> &'static str { static PROCESS_LOG_UUID: OnceLock = OnceLock::new(); PROCESS_LOG_UUID.get_or_init(|| { @@ -248,7 +307,7 @@ async fn run_inserter( maybe_command = receiver.recv() => { match maybe_command { Some(LogDbCommand::Entry(entry)) => { - buffer.push(entry); + buffer.push(*entry); if buffer.len() >= LOG_BATCH_SIZE { flush(&state_db, &mut buffer).await; } @@ -401,7 +460,6 @@ mod tests { .with( tracing_subscriber::fmt::layer() .with_writer(writer.clone()) - .without_time() .with_ansi(false) .with_target(false) .with_filter(Targets::new().with_default(tracing::Level::TRACE)), @@ -413,30 +471,23 @@ mod tests { let guard = subscriber.set_default(); tracing::trace!("threadless-before"); - tracing::info_span!("feedback-thread", thread_id = "thread-1").in_scope(|| { - tracing::info!("thread-scoped"); + tracing::info_span!("feedback-thread", thread_id = "thread-1", turn = 1).in_scope(|| { + tracing::info!(foo = 2, "thread-scoped"); }); tracing::debug!("threadless-after"); drop(guard); - // SQLite exports now include timestamps, while this test writer has - // `.without_time()`. Compare bodies after stripping the SQLite prefix. - let feedback_logs = writer - .snapshot() - .replace("feedback-thread{thread_id=\"thread-1\"}: ", ""); - let strip_sqlite_timestamp = |logs: &str| { + let feedback_logs = writer.snapshot(); + let without_timestamps = |logs: &str| { logs.lines() - .map(|line| { - line.split_once(' ') - .map_or_else(|| line.to_string(), |(_, rest)| rest.to_string()) + .map(|line| match line.split_once(' ') { + Some((_, rest)) => rest, + None => line, }) .collect::>() + .join("\n") }; - let feedback_lines = feedback_logs - .lines() - .map(ToString::to_string) - .collect::>(); let deadline = Instant::now() + Duration::from_secs(2); loop { let sqlite_logs = String::from_utf8( @@ -446,7 +497,7 @@ mod tests { .expect("query feedback logs"), ) .expect("valid utf-8"); - if strip_sqlite_timestamp(&sqlite_logs) == feedback_lines { + if without_timestamps(&sqlite_logs) == without_timestamps(&feedback_logs) { break; } assert!( diff --git a/codex-rs/state/src/model/log.rs b/codex-rs/state/src/model/log.rs index cf973bcee..680486293 100644 --- a/codex-rs/state/src/model/log.rs +++ b/codex-rs/state/src/model/log.rs @@ -8,6 +8,7 @@ pub struct LogEntry { pub level: String, pub target: String, pub message: Option, + pub feedback_log_body: Option, pub thread_id: Option, pub process_uuid: Option, pub module_path: Option, diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 7ea6a53b2..645aa4269 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -57,10 +57,12 @@ mod memories; mod test_support; mod threads; -// "Partition" is the retention bucket we cap at 10 MiB: +// "Partition" is the retained-log-content bucket we cap at 10 MiB: // - one bucket per non-null thread_id // - one bucket per threadless (thread_id IS NULL) non-null process_uuid // - one bucket for threadless rows with process_uuid IS NULL +// This budget tracks each row's persisted rendered log body plus non-body +// metadata, rather than the exact sum of all persisted SQLite column bytes. const LOG_PARTITION_SIZE_LIMIT_BYTES: i64 = 10 * 1024 * 1024; const LOG_PARTITION_ROW_LIMIT: i64 = 1_000; diff --git a/codex-rs/state/src/runtime/logs.rs b/codex-rs/state/src/runtime/logs.rs index a2c2779a6..6c6009e96 100644 --- a/codex-rs/state/src/runtime/logs.rs +++ b/codex-rs/state/src/runtime/logs.rs @@ -13,10 +13,15 @@ impl StateRuntime { let mut tx = self.logs_pool.begin().await?; let mut builder = QueryBuilder::::new( - "INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line, estimated_bytes) ", + "INSERT INTO logs (ts, ts_nanos, level, target, feedback_log_body, thread_id, process_uuid, module_path, file, line, estimated_bytes) ", ); builder.push_values(entries, |mut row, entry| { - let estimated_bytes = entry.message.as_ref().map_or(0, String::len) as i64 + let feedback_log_body = entry.feedback_log_body.as_ref().or(entry.message.as_ref()); + // Keep about 10 MiB of reader-visible log content per partition. + // Both `query_logs` and `/feedback` read the persisted + // `feedback_log_body`, while `LogEntry.message` is only a write-time + // fallback for callers that still populate the old field. + let estimated_bytes = feedback_log_body.map_or(0, String::len) as i64 + entry.level.len() as i64 + entry.target.len() as i64 + entry.module_path.as_ref().map_or(0, String::len) as i64 @@ -25,7 +30,7 @@ impl StateRuntime { .push_bind(entry.ts_nanos) .push_bind(&entry.level) .push_bind(&entry.target) - .push_bind(&entry.message) + .push_bind(feedback_log_body) .push_bind(&entry.thread_id) .push_bind(&entry.process_uuid) .push_bind(&entry.module_path) @@ -39,7 +44,7 @@ impl StateRuntime { Ok(()) } - /// Enforce per-partition log size caps after a successful batch insert. + /// Enforce per-partition retained-log-content caps after a successful batch insert. /// /// We maintain two independent budgets: /// - Thread logs: rows with `thread_id IS NOT NULL`, capped per `thread_id`. @@ -289,7 +294,7 @@ WHERE id IN ( /// Query logs with optional filters. pub async fn query_logs(&self, query: &LogQuery) -> anyhow::Result> { let mut builder = QueryBuilder::::new( - "SELECT id, ts, ts_nanos, level, target, message, thread_id, process_uuid, file, line FROM logs WHERE 1 = 1", + "SELECT id, ts, ts_nanos, level, target, feedback_log_body AS message, thread_id, process_uuid, file, line FROM logs WHERE 1 = 1", ); push_log_filters(&mut builder, query); if query.descending { @@ -310,10 +315,10 @@ WHERE id IN ( /// Query per-thread feedback logs, capped to the per-thread SQLite retention budget. pub async fn query_feedback_logs(&self, thread_id: &str) -> anyhow::Result> { - let max_bytes = LOG_PARTITION_SIZE_LIMIT_BYTES; - // TODO(ccunningham): Store rendered span/event fields in SQLite so this - // export can match feedback formatting beyond timestamp + level + message. - let lines = sqlx::query_scalar::<_, String>( + let max_bytes = usize::try_from(LOG_PARTITION_SIZE_LIMIT_BYTES).unwrap_or(usize::MAX); + // Bound the fetched rows in SQL first so over-retained partitions do not have to load + // every row into memory, then apply the exact whole-line byte cap after formatting. + let rows = sqlx::query_as::<_, FeedbackLogRow>( r#" WITH latest_process AS ( SELECT process_uuid @@ -323,64 +328,58 @@ WITH latest_process AS ( LIMIT 1 ), feedback_logs AS ( - SELECT - printf( - '%s.%06dZ %5s %s', - strftime('%Y-%m-%dT%H:%M:%S', ts, 'unixepoch'), - ts_nanos / 1000, - level, - message - ) || CASE - WHEN substr(message, -1, 1) = char(10) THEN '' - ELSE char(10) - END AS line, - length(CAST( - printf( - '%s.%06dZ %5s %s', - strftime('%Y-%m-%dT%H:%M:%S', ts, 'unixepoch'), - ts_nanos / 1000, - level, - message - ) || CASE - WHEN substr(message, -1, 1) = char(10) THEN '' - ELSE char(10) - END AS BLOB - )) AS line_bytes, - ts, - ts_nanos, - id + SELECT ts, ts_nanos, level, feedback_log_body, estimated_bytes, id FROM logs - WHERE message IS NOT NULL AND ( + WHERE feedback_log_body IS NOT NULL AND ( thread_id = ? OR ( thread_id IS NULL AND process_uuid IN (SELECT process_uuid FROM latest_process) ) ) -) -SELECT line -FROM ( +), +bounded_feedback_logs AS ( SELECT - line, ts, ts_nanos, + level, + feedback_log_body, id, - SUM(line_bytes) OVER ( + SUM(estimated_bytes) OVER ( ORDER BY ts DESC, ts_nanos DESC, id DESC - ) AS cumulative_bytes + ) AS cumulative_estimated_bytes FROM feedback_logs ) -WHERE cumulative_bytes <= ? -ORDER BY ts ASC, ts_nanos ASC, id ASC +SELECT ts, ts_nanos, level, feedback_log_body +FROM bounded_feedback_logs +WHERE cumulative_estimated_bytes <= ? +ORDER BY ts DESC, ts_nanos DESC, id DESC "#, ) .bind(thread_id) .bind(thread_id) - .bind(max_bytes) + .bind(LOG_PARTITION_SIZE_LIMIT_BYTES) .fetch_all(self.logs_pool.as_ref()) .await?; - Ok(lines.concat().into_bytes()) + let mut lines = Vec::new(); + let mut total_bytes = 0usize; + for row in rows { + let line = + format_feedback_log_line(row.ts, row.ts_nanos, &row.level, &row.feedback_log_body); + if total_bytes.saturating_add(line.len()) > max_bytes { + break; + } + total_bytes += line.len(); + lines.push(line); + } + + let mut ordered_bytes = Vec::with_capacity(total_bytes); + for line in lines.into_iter().rev() { + ordered_bytes.extend_from_slice(line.as_bytes()); + } + + Ok(ordered_bytes) } /// Return the max log id matching optional filters. @@ -394,6 +393,32 @@ ORDER BY ts ASC, ts_nanos ASC, id ASC } } +#[derive(sqlx::FromRow)] +struct FeedbackLogRow { + ts: i64, + ts_nanos: i64, + level: String, + feedback_log_body: String, +} + +fn format_feedback_log_line( + ts: i64, + ts_nanos: i64, + level: &str, + feedback_log_body: &str, +) -> String { + let nanos = u32::try_from(ts_nanos).unwrap_or(0); + let timestamp = match DateTime::::from_timestamp(ts, nanos) { + Some(dt) => dt.to_rfc3339_opts(chrono::SecondsFormat::Micros, true), + None => format!("{ts}.{ts_nanos:09}Z"), + }; + let mut line = format!("{timestamp} {level:>5} {feedback_log_body}"); + if !line.ends_with('\n') { + line.push('\n'); + } + line +} + fn push_log_filters<'a>(builder: &mut QueryBuilder<'a, Sqlite>, query: &'a LogQuery) { if let Some(level_upper) = query.level_upper.as_ref() { builder @@ -431,7 +456,7 @@ fn push_log_filters<'a>(builder: &mut QueryBuilder<'a, Sqlite>, query: &'a LogQu builder.push(" AND id > ").push_bind(after_id); } if let Some(search) = query.search.as_ref() { - builder.push(" AND INSTR(message, "); + builder.push(" AND INSTR(COALESCE(feedback_log_body, ''), "); builder.push_bind(search.as_str()); builder.push(") > 0"); } @@ -462,14 +487,18 @@ fn push_like_filters<'a>( #[cfg(test)] mod tests { use super::StateRuntime; + use super::format_feedback_log_line; use super::test_support::unique_temp_dir; use crate::LogEntry; use crate::LogQuery; use crate::logs_db_path; + use crate::migrations::LOGS_MIGRATOR; use crate::state_db_path; use pretty_assertions::assert_eq; use sqlx::SqlitePool; + use sqlx::migrate::Migrator; use sqlx::sqlite::SqliteConnectOptions; + use std::borrow::Cow; use std::path::Path; async fn open_db_pool(path: &Path) -> SqlitePool { @@ -506,6 +535,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("dedicated-log-db".to_string()), + feedback_log_body: Some("dedicated-log-db".to_string()), thread_id: Some("thread-1".to_string()), process_uuid: Some("proc-1".to_string()), module_path: Some("mod".to_string()), @@ -525,7 +555,119 @@ mod tests { } #[tokio::test] - async fn query_logs_with_search_matches_substring() { + async fn init_migrates_message_only_logs_db_to_feedback_log_body_schema() { + let codex_home = unique_temp_dir(); + tokio::fs::create_dir_all(&codex_home) + .await + .expect("create codex home"); + let logs_path = logs_db_path(codex_home.as_path()); + let old_logs_migrator = Migrator { + migrations: Cow::Owned(vec![LOGS_MIGRATOR.migrations[0].clone()]), + ignore_missing: false, + locking: true, + no_tx: false, + }; + let pool = SqlitePool::connect_with( + SqliteConnectOptions::new() + .filename(&logs_path) + .create_if_missing(true), + ) + .await + .expect("open old logs db"); + old_logs_migrator + .run(&pool) + .await + .expect("apply old logs schema"); + sqlx::query( + "INSERT INTO logs (ts, ts_nanos, level, target, message, module_path, file, line, thread_id, process_uuid, estimated_bytes) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(1_i64) + .bind(0_i64) + .bind("INFO") + .bind("cli") + .bind("legacy-body") + .bind("mod") + .bind("main.rs") + .bind(7_i64) + .bind("thread-1") + .bind("proc-1") + .bind(16_i64) + .execute(&pool) + .await + .expect("insert legacy log row"); + pool.close().await; + + let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) + .await + .expect("initialize runtime"); + + let rows = runtime + .query_logs(&LogQuery::default()) + .await + .expect("query migrated logs"); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].message.as_deref(), Some("legacy-body")); + + let migrated_pool = open_db_pool(logs_path.as_path()).await; + let columns = sqlx::query_scalar::<_, String>("SELECT name FROM pragma_table_info('logs')") + .fetch_all(&migrated_pool) + .await + .expect("load migrated columns"); + assert_eq!( + columns, + vec![ + "id".to_string(), + "ts".to_string(), + "ts_nanos".to_string(), + "level".to_string(), + "target".to_string(), + "feedback_log_body".to_string(), + "module_path".to_string(), + "file".to_string(), + "line".to_string(), + "thread_id".to_string(), + "process_uuid".to_string(), + "estimated_bytes".to_string(), + ] + ); + let indexes = sqlx::query_scalar::<_, String>( + "SELECT name FROM pragma_index_list('logs') ORDER BY name", + ) + .fetch_all(&migrated_pool) + .await + .expect("load migrated indexes"); + assert_eq!( + indexes, + vec![ + "idx_logs_process_uuid_threadless_ts".to_string(), + "idx_logs_thread_id".to_string(), + "idx_logs_thread_id_ts".to_string(), + "idx_logs_ts".to_string(), + ] + ); + migrated_pool.close().await; + + let _ = tokio::fs::remove_dir_all(codex_home).await; + } + + #[test] + fn format_feedback_log_line_matches_feedback_formatter_shape() { + assert_eq!( + format_feedback_log_line(1, 123_456_000, "INFO", "alpha"), + "1970-01-01T00:00:01.123456Z INFO alpha\n" + ); + } + + #[test] + fn format_feedback_log_line_preserves_existing_trailing_newline() { + assert_eq!( + format_feedback_log_line(1, 123_456_000, "INFO", "alpha\n"), + "1970-01-01T00:00:01.123456Z INFO alpha\n" + ); + } + + #[tokio::test] + async fn query_logs_with_search_matches_rendered_body_substring() { let codex_home = unique_temp_dir(); let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string()) .await @@ -539,6 +681,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("alpha".to_string()), + feedback_log_body: Some("foo=1 alpha".to_string()), thread_id: Some("thread-1".to_string()), process_uuid: None, file: Some("main.rs".to_string()), @@ -551,6 +694,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("alphabet".to_string()), + feedback_log_body: Some("foo=2 alphabet".to_string()), thread_id: Some("thread-1".to_string()), process_uuid: None, file: Some("main.rs".to_string()), @@ -563,14 +707,14 @@ mod tests { let rows = runtime .query_logs(&LogQuery { - search: Some("alphab".to_string()), + search: Some("foo=2".to_string()), ..Default::default() }) .await .expect("query matching logs"); assert_eq!(rows.len(), 1); - assert_eq!(rows[0].message.as_deref(), Some("alphabet")); + assert_eq!(rows[0].message.as_deref(), Some("foo=2 alphabet")); let _ = tokio::fs::remove_dir_all(codex_home).await; } @@ -590,7 +734,8 @@ mod tests { ts_nanos: 0, level: "INFO".to_string(), target: "cli".to_string(), - message: Some(six_mebibytes.clone()), + message: Some("small".to_string()), + feedback_log_body: Some(six_mebibytes.clone()), thread_id: Some("thread-1".to_string()), process_uuid: Some("proc-1".to_string()), file: Some("main.rs".to_string()), @@ -602,7 +747,8 @@ mod tests { ts_nanos: 0, level: "INFO".to_string(), target: "cli".to_string(), - message: Some(six_mebibytes.clone()), + message: Some("small".to_string()), + feedback_log_body: Some(six_mebibytes.clone()), thread_id: Some("thread-1".to_string()), process_uuid: Some("proc-1".to_string()), file: Some("main.rs".to_string()), @@ -641,7 +787,8 @@ mod tests { ts_nanos: 0, level: "INFO".to_string(), target: "cli".to_string(), - message: Some(eleven_mebibytes), + message: Some("small".to_string()), + feedback_log_body: Some(eleven_mebibytes), thread_id: Some("thread-oversized".to_string()), process_uuid: Some("proc-1".to_string()), file: Some("main.rs".to_string()), @@ -680,6 +827,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some(six_mebibytes.clone()), + feedback_log_body: None, thread_id: None, process_uuid: Some("proc-1".to_string()), file: Some("main.rs".to_string()), @@ -692,6 +840,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some(six_mebibytes.clone()), + feedback_log_body: None, thread_id: None, process_uuid: Some("proc-1".to_string()), file: Some("main.rs".to_string()), @@ -704,6 +853,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some(six_mebibytes), + feedback_log_body: None, thread_id: Some("thread-1".to_string()), process_uuid: Some("proc-1".to_string()), file: Some("main.rs".to_string()), @@ -744,7 +894,8 @@ mod tests { ts_nanos: 0, level: "INFO".to_string(), target: "cli".to_string(), - message: Some(eleven_mebibytes), + message: Some("small".to_string()), + feedback_log_body: Some(eleven_mebibytes), thread_id: None, process_uuid: Some("proc-oversized".to_string()), file: Some("main.rs".to_string()), @@ -783,6 +934,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some(six_mebibytes.clone()), + feedback_log_body: None, thread_id: None, process_uuid: None, file: Some("main.rs".to_string()), @@ -795,6 +947,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some(six_mebibytes), + feedback_log_body: None, thread_id: None, process_uuid: None, file: Some("main.rs".to_string()), @@ -807,6 +960,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("small".to_string()), + feedback_log_body: None, thread_id: None, process_uuid: Some("proc-1".to_string()), file: Some("main.rs".to_string()), @@ -846,7 +1000,8 @@ mod tests { ts_nanos: 0, level: "INFO".to_string(), target: "cli".to_string(), - message: Some(eleven_mebibytes), + message: Some("small".to_string()), + feedback_log_body: Some(eleven_mebibytes), thread_id: None, process_uuid: None, file: Some("main.rs".to_string()), @@ -883,6 +1038,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some(format!("thread-row-{ts}")), + feedback_log_body: None, thread_id: Some("thread-row-limit".to_string()), process_uuid: Some("proc-1".to_string()), file: Some("main.rs".to_string()), @@ -925,6 +1081,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some(format!("process-row-{ts}")), + feedback_log_body: None, thread_id: None, process_uuid: Some("proc-row-limit".to_string()), file: Some("main.rs".to_string()), @@ -971,6 +1128,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some(format!("null-process-row-{ts}")), + feedback_log_body: None, thread_id: None, process_uuid: None, file: Some("main.rs".to_string()), @@ -1018,6 +1176,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("alpha".to_string()), + feedback_log_body: None, thread_id: Some("thread-1".to_string()), process_uuid: Some("proc-1".to_string()), file: None, @@ -1030,6 +1189,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("bravo".to_string()), + feedback_log_body: None, thread_id: Some("thread-1".to_string()), process_uuid: Some("proc-1".to_string()), file: None, @@ -1042,6 +1202,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("charlie".to_string()), + feedback_log_body: None, thread_id: Some("thread-1".to_string()), process_uuid: Some("proc-1".to_string()), file: None, @@ -1059,7 +1220,12 @@ mod tests { assert_eq!( String::from_utf8(bytes).expect("valid utf-8"), - "1970-01-01T00:00:01.000000Z INFO alpha\n1970-01-01T00:00:02.000000Z INFO bravo\n1970-01-01T00:00:03.000000Z INFO charlie\n" + [ + format_feedback_log_line(1, 0, "INFO", "alpha"), + format_feedback_log_line(2, 0, "INFO", "bravo"), + format_feedback_log_line(3, 0, "INFO", "charlie"), + ] + .concat() ); let _ = tokio::fs::remove_dir_all(codex_home).await; @@ -1081,6 +1247,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("small".to_string()), + feedback_log_body: None, thread_id: Some("thread-oversized".to_string()), process_uuid: Some("proc-1".to_string()), file: None, @@ -1093,6 +1260,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some(eleven_mebibytes), + feedback_log_body: None, thread_id: Some("thread-oversized".to_string()), process_uuid: Some("proc-1".to_string()), file: None, @@ -1128,6 +1296,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("threadless-before".to_string()), + feedback_log_body: None, thread_id: None, process_uuid: Some("proc-1".to_string()), file: None, @@ -1140,6 +1309,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("thread-scoped".to_string()), + feedback_log_body: None, thread_id: Some("thread-1".to_string()), process_uuid: Some("proc-1".to_string()), file: None, @@ -1152,6 +1322,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("threadless-after".to_string()), + feedback_log_body: None, thread_id: None, process_uuid: Some("proc-1".to_string()), file: None, @@ -1164,6 +1335,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("other-process-threadless".to_string()), + feedback_log_body: None, thread_id: None, process_uuid: Some("proc-2".to_string()), file: None, @@ -1181,7 +1353,12 @@ mod tests { assert_eq!( String::from_utf8(bytes).expect("valid utf-8"), - "1970-01-01T00:00:01.000000Z INFO threadless-before\n1970-01-01T00:00:02.000000Z INFO thread-scoped\n1970-01-01T00:00:03.000000Z INFO threadless-after\n" + [ + format_feedback_log_line(1, 0, "INFO", "threadless-before"), + format_feedback_log_line(2, 0, "INFO", "thread-scoped"), + format_feedback_log_line(3, 0, "INFO", "threadless-after"), + ] + .concat() ); let _ = tokio::fs::remove_dir_all(codex_home).await; @@ -1202,6 +1379,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("old-process-threadless".to_string()), + feedback_log_body: None, thread_id: None, process_uuid: Some("proc-old".to_string()), file: None, @@ -1214,6 +1392,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("old-process-thread".to_string()), + feedback_log_body: None, thread_id: Some("thread-1".to_string()), process_uuid: Some("proc-old".to_string()), file: None, @@ -1226,6 +1405,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("new-process-thread".to_string()), + feedback_log_body: None, thread_id: Some("thread-1".to_string()), process_uuid: Some("proc-new".to_string()), file: None, @@ -1238,6 +1418,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some("new-process-threadless".to_string()), + feedback_log_body: None, thread_id: None, process_uuid: Some("proc-new".to_string()), file: None, @@ -1255,7 +1436,12 @@ mod tests { assert_eq!( String::from_utf8(bytes).expect("valid utf-8"), - "1970-01-01T00:00:02.000000Z INFO old-process-thread\n1970-01-01T00:00:03.000000Z INFO new-process-thread\n1970-01-01T00:00:04.000000Z INFO new-process-threadless\n" + [ + format_feedback_log_line(2, 0, "INFO", "old-process-thread"), + format_feedback_log_line(3, 0, "INFO", "new-process-thread"), + format_feedback_log_line(4, 0, "INFO", "new-process-threadless"), + ] + .concat() ); let _ = tokio::fs::remove_dir_all(codex_home).await; @@ -1285,6 +1471,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some(one_mebibyte.clone()), + feedback_log_body: None, thread_id: Some("thread-1".to_string()), process_uuid: Some("proc-1".to_string()), file: None, @@ -1297,6 +1484,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some(five_mebibytes), + feedback_log_body: None, thread_id: None, process_uuid: Some("proc-1".to_string()), file: None, @@ -1309,6 +1497,7 @@ mod tests { level: "INFO".to_string(), target: "cli".to_string(), message: Some(four_and_half_mebibytes), + feedback_log_body: None, thread_id: None, process_uuid: Some("proc-1".to_string()), file: None,