core-agent-ide/codex-rs/state/src/log_db.rs
Charley Cunningham ebbbc52ce4
Align SQLite feedback logs with feedback formatter (#13494)
## Summary
- store a pre-rendered `feedback_log_body` in SQLite so `/feedback`
exports keep span prefixes and structured event fields
- render SQLite feedback exports with timestamps and level prefixes to
match the old in-memory feedback formatter, while preserving existing
trailing newlines
- count `feedback_log_body` in the SQLite retention budget so structured
or span-prefixed rows still prune correctly
- bound `/feedback` row loading in SQL with the retention estimate, then
apply exact whole-line truncation in Rust so uploads stay capped without
splitting lines

## Details
- add a `feedback_log_body` column to `logs` and backfill it from
`message` for existing rows
- capture span names plus formatted span and event fields at write time,
since SQLite does not retain enough structure to reconstruct the old
formatter later
- keep SQLite feedback queries scoped to the requested thread plus
same-process threadless rows
- restore a SQL-side cumulative `estimated_bytes` cap for feedback
export queries so over-retained partitions do not load every matching
row before truncation
- add focused formatting coverage for exported feedback lines and parity
coverage against `tracing_subscriber`

## Testing
- cargo test -p codex-state
- just fix -p codex-state
- just fmt

codex author: `codex resume 019ca1b0-0ecc-78b1-85eb-6befdd7e4f1f`

---------

Co-authored-by: Codex <noreply@openai.com>
2026-03-18 22:44:31 +00:00

544 lines
17 KiB
Rust

//! Tracing log export into the state SQLite database.
//!
//! This module provides a `tracing_subscriber::Layer` that captures events and
//! inserts them into the dedicated `logs` SQLite database. The writer runs in a
//! background task and batches inserts to keep logging overhead low.
//!
//! ## Usage
//!
//! ```no_run
//! use codex_state::log_db;
//! use tracing_subscriber::prelude::*;
//!
//! # async fn example(state_db: std::sync::Arc<codex_state::StateRuntime>) {
//! let layer = log_db::start(state_db);
//! let _ = tracing_subscriber::registry()
//! .with(layer)
//! .try_init();
//! # }
//! ```
use chrono::Duration as ChronoDuration;
use chrono::Utc;
use std::sync::OnceLock;
use std::time::Duration;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tracing::Event;
use tracing::field::Field;
use tracing::field::Visit;
use tracing::span::Attributes;
use tracing::span::Id;
use tracing::span::Record;
use tracing_subscriber::Layer;
use tracing_subscriber::field::RecordFields;
use tracing_subscriber::fmt::FormatFields;
use tracing_subscriber::fmt::FormattedFields;
use tracing_subscriber::fmt::format::DefaultFields;
use tracing_subscriber::registry::LookupSpan;
use uuid::Uuid;
use crate::LogEntry;
use crate::StateRuntime;
const LOG_QUEUE_CAPACITY: usize = 512;
const LOG_BATCH_SIZE: usize = 128;
const LOG_FLUSH_INTERVAL: Duration = Duration::from_secs(2);
const LOG_RETENTION_DAYS: i64 = 10;
pub struct LogDbLayer {
sender: mpsc::Sender<LogDbCommand>,
process_uuid: String,
}
pub fn start(state_db: std::sync::Arc<StateRuntime>) -> LogDbLayer {
let process_uuid = current_process_log_uuid().to_string();
let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY);
tokio::spawn(run_inserter(std::sync::Arc::clone(&state_db), receiver));
tokio::spawn(run_retention_cleanup(state_db));
LogDbLayer {
sender,
process_uuid,
}
}
impl Clone for LogDbLayer {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
process_uuid: self.process_uuid.clone(),
}
}
}
impl LogDbLayer {
pub async fn flush(&self) {
let (tx, rx) = oneshot::channel();
if self.sender.send(LogDbCommand::Flush(tx)).await.is_ok() {
let _ = rx.await;
}
}
}
impl<S> Layer<S> for LogDbLayer
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
fn on_new_span(
&self,
attrs: &Attributes<'_>,
id: &Id,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let mut visitor = SpanFieldVisitor::default();
attrs.record(&mut visitor);
if let Some(span) = ctx.span(id) {
span.extensions_mut().insert(SpanLogContext {
name: span.metadata().name().to_string(),
formatted_fields: format_fields(attrs),
thread_id: visitor.thread_id,
});
}
}
fn on_record(
&self,
id: &Id,
values: &Record<'_>,
ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let mut visitor = SpanFieldVisitor::default();
values.record(&mut visitor);
if let Some(span) = ctx.span(id) {
let mut extensions = span.extensions_mut();
if let Some(log_context) = extensions.get_mut::<SpanLogContext>() {
if let Some(thread_id) = visitor.thread_id {
log_context.thread_id = Some(thread_id);
}
append_fields(&mut log_context.formatted_fields, values);
} else {
extensions.insert(SpanLogContext {
name: span.metadata().name().to_string(),
formatted_fields: format_fields(values),
thread_id: visitor.thread_id,
});
}
}
}
fn on_event(&self, event: &Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) {
let metadata = event.metadata();
let mut visitor = MessageVisitor::default();
event.record(&mut visitor);
let thread_id = visitor
.thread_id
.clone()
.or_else(|| event_thread_id(event, &ctx));
let feedback_log_body = format_feedback_log_body(event, &ctx);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_else(|_| Duration::from_secs(0));
let entry = LogEntry {
ts: now.as_secs() as i64,
ts_nanos: now.subsec_nanos() as i64,
level: metadata.level().as_str().to_string(),
target: metadata.target().to_string(),
message: visitor.message,
feedback_log_body: Some(feedback_log_body),
thread_id,
process_uuid: Some(self.process_uuid.clone()),
module_path: metadata.module_path().map(ToString::to_string),
file: metadata.file().map(ToString::to_string),
line: metadata.line().map(|line| line as i64),
};
let _ = self.sender.try_send(LogDbCommand::Entry(Box::new(entry)));
}
}
enum LogDbCommand {
Entry(Box<LogEntry>),
Flush(oneshot::Sender<()>),
}
#[derive(Debug)]
struct SpanLogContext {
name: String,
formatted_fields: String,
thread_id: Option<String>,
}
#[derive(Default)]
struct SpanFieldVisitor {
thread_id: Option<String>,
}
impl SpanFieldVisitor {
fn record_field(&mut self, field: &Field, value: String) {
if field.name() == "thread_id" && self.thread_id.is_none() {
self.thread_id = Some(value);
}
}
}
impl Visit for SpanFieldVisitor {
fn record_i64(&mut self, field: &Field, value: i64) {
self.record_field(field, value.to_string());
}
fn record_u64(&mut self, field: &Field, value: u64) {
self.record_field(field, value.to_string());
}
fn record_bool(&mut self, field: &Field, value: bool) {
self.record_field(field, value.to_string());
}
fn record_f64(&mut self, field: &Field, value: f64) {
self.record_field(field, value.to_string());
}
fn record_str(&mut self, field: &Field, value: &str) {
self.record_field(field, value.to_string());
}
fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
self.record_field(field, value.to_string());
}
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.record_field(field, format!("{value:?}"));
}
}
fn event_thread_id<S>(
event: &Event<'_>,
ctx: &tracing_subscriber::layer::Context<'_, S>,
) -> Option<String>
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
let mut thread_id = None;
if let Some(scope) = ctx.event_scope(event) {
for span in scope.from_root() {
let extensions = span.extensions();
if let Some(log_context) = extensions.get::<SpanLogContext>()
&& log_context.thread_id.is_some()
{
thread_id = log_context.thread_id.clone();
}
}
}
thread_id
}
fn format_feedback_log_body<S>(
event: &Event<'_>,
ctx: &tracing_subscriber::layer::Context<'_, S>,
) -> String
where
S: tracing::Subscriber + for<'a> LookupSpan<'a>,
{
let mut feedback_log_body = String::new();
if let Some(scope) = ctx.event_scope(event) {
for span in scope.from_root() {
let extensions = span.extensions();
if let Some(log_context) = extensions.get::<SpanLogContext>() {
feedback_log_body.push_str(&log_context.name);
if !log_context.formatted_fields.is_empty() {
feedback_log_body.push('{');
feedback_log_body.push_str(&log_context.formatted_fields);
feedback_log_body.push('}');
}
} else {
feedback_log_body.push_str(span.metadata().name());
}
feedback_log_body.push(':');
}
if !feedback_log_body.is_empty() {
feedback_log_body.push(' ');
}
}
feedback_log_body.push_str(&format_fields(event));
feedback_log_body
}
fn format_fields<R>(fields: R) -> String
where
R: RecordFields,
{
let formatter = DefaultFields::default();
let mut formatted = FormattedFields::<DefaultFields>::new(String::new());
let _ = formatter.format_fields(formatted.as_writer(), fields);
formatted.fields
}
fn append_fields(fields: &mut String, values: &Record<'_>) {
let formatter = DefaultFields::default();
let mut formatted = FormattedFields::<DefaultFields>::new(std::mem::take(fields));
let _ = formatter.add_fields(&mut formatted, values);
*fields = formatted.fields;
}
fn current_process_log_uuid() -> &'static str {
static PROCESS_LOG_UUID: OnceLock<String> = OnceLock::new();
PROCESS_LOG_UUID.get_or_init(|| {
let pid = std::process::id();
let process_uuid = Uuid::new_v4();
format!("pid:{pid}:{process_uuid}")
})
}
async fn run_inserter(
state_db: std::sync::Arc<StateRuntime>,
mut receiver: mpsc::Receiver<LogDbCommand>,
) {
let mut buffer = Vec::with_capacity(LOG_BATCH_SIZE);
let mut ticker = tokio::time::interval(LOG_FLUSH_INTERVAL);
loop {
tokio::select! {
maybe_command = receiver.recv() => {
match maybe_command {
Some(LogDbCommand::Entry(entry)) => {
buffer.push(*entry);
if buffer.len() >= LOG_BATCH_SIZE {
flush(&state_db, &mut buffer).await;
}
}
Some(LogDbCommand::Flush(reply)) => {
flush(&state_db, &mut buffer).await;
let _ = reply.send(());
}
None => {
flush(&state_db, &mut buffer).await;
break;
}
}
}
_ = ticker.tick() => {
flush(&state_db, &mut buffer).await;
}
}
}
}
async fn flush(state_db: &std::sync::Arc<StateRuntime>, buffer: &mut Vec<LogEntry>) {
if buffer.is_empty() {
return;
}
let entries = buffer.split_off(0);
let _ = state_db.insert_logs(entries.as_slice()).await;
}
async fn run_retention_cleanup(state_db: std::sync::Arc<StateRuntime>) {
let Some(cutoff) = Utc::now().checked_sub_signed(ChronoDuration::days(LOG_RETENTION_DAYS))
else {
return;
};
let _ = state_db.delete_logs_before(cutoff.timestamp()).await;
}
#[derive(Default)]
struct MessageVisitor {
message: Option<String>,
thread_id: Option<String>,
}
impl MessageVisitor {
fn record_field(&mut self, field: &Field, value: String) {
if field.name() == "message" && self.message.is_none() {
self.message = Some(value.clone());
}
if field.name() == "thread_id" && self.thread_id.is_none() {
self.thread_id = Some(value);
}
}
}
impl Visit for MessageVisitor {
fn record_i64(&mut self, field: &Field, value: i64) {
self.record_field(field, value.to_string());
}
fn record_u64(&mut self, field: &Field, value: u64) {
self.record_field(field, value.to_string());
}
fn record_bool(&mut self, field: &Field, value: bool) {
self.record_field(field, value.to_string());
}
fn record_f64(&mut self, field: &Field, value: f64) {
self.record_field(field, value.to_string());
}
fn record_str(&mut self, field: &Field, value: &str) {
self.record_field(field, value.to_string());
}
fn record_error(&mut self, field: &Field, value: &(dyn std::error::Error + 'static)) {
self.record_field(field, value.to_string());
}
fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
self.record_field(field, format!("{value:?}"));
}
}
#[cfg(test)]
mod tests {
use std::io;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use tokio::time::Instant;
use tracing_subscriber::filter::Targets;
use tracing_subscriber::fmt::writer::MakeWriter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use super::*;
#[derive(Clone, Default)]
struct SharedWriter {
bytes: Arc<Mutex<Vec<u8>>>,
}
impl SharedWriter {
fn snapshot(&self) -> String {
String::from_utf8(self.bytes.lock().expect("writer mutex poisoned").clone())
.expect("valid utf-8")
}
}
struct SharedWriterGuard {
bytes: Arc<Mutex<Vec<u8>>>,
}
impl<'a> MakeWriter<'a> for SharedWriter {
type Writer = SharedWriterGuard;
fn make_writer(&'a self) -> Self::Writer {
SharedWriterGuard {
bytes: Arc::clone(&self.bytes),
}
}
}
impl io::Write for SharedWriterGuard {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.bytes
.lock()
.expect("writer mutex poisoned")
.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
#[tokio::test]
async fn sqlite_feedback_logs_match_feedback_formatter_shape() {
let codex_home =
std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4()));
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
let writer = SharedWriter::default();
let subscriber = tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_writer(writer.clone())
.with_ansi(false)
.with_target(false)
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
)
.with(
start(runtime.clone())
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
);
let guard = subscriber.set_default();
tracing::trace!("threadless-before");
tracing::info_span!("feedback-thread", thread_id = "thread-1", turn = 1).in_scope(|| {
tracing::info!(foo = 2, "thread-scoped");
});
tracing::debug!("threadless-after");
drop(guard);
let feedback_logs = writer.snapshot();
let without_timestamps = |logs: &str| {
logs.lines()
.map(|line| match line.split_once(' ') {
Some((_, rest)) => rest,
None => line,
})
.collect::<Vec<_>>()
.join("\n")
};
let deadline = Instant::now() + Duration::from_secs(2);
loop {
let sqlite_logs = String::from_utf8(
runtime
.query_feedback_logs("thread-1")
.await
.expect("query feedback logs"),
)
.expect("valid utf-8");
if without_timestamps(&sqlite_logs) == without_timestamps(&feedback_logs) {
break;
}
assert!(
Instant::now() < deadline,
"sqlite feedback logs did not match feedback formatter output before timeout\nsqlite:\n{sqlite_logs}\nfeedback:\n{feedback_logs}"
);
tokio::time::sleep(Duration::from_millis(10)).await;
}
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
#[tokio::test]
async fn flush_persists_logs_for_query() {
let codex_home =
std::env::temp_dir().join(format!("codex-state-log-db-{}", Uuid::new_v4()));
let runtime = StateRuntime::init(codex_home.clone(), "test-provider".to_string())
.await
.expect("initialize runtime");
let layer = start(runtime.clone());
let guard = tracing_subscriber::registry()
.with(
layer
.clone()
.with_filter(Targets::new().with_default(tracing::Level::TRACE)),
)
.set_default();
tracing::info!("buffered-log");
layer.flush().await;
drop(guard);
let after_flush = runtime
.query_logs(&crate::LogQuery::default())
.await
.expect("query logs after flush");
assert_eq!(after_flush.len(), 1);
assert_eq!(after_flush[0].message.as_deref(), Some("buffered-log"));
let _ = tokio::fs::remove_dir_all(codex_home).await;
}
}