diff --git a/codex-rs/state/migrations/0010_logs_process_id.sql b/codex-rs/state/migrations/0010_logs_process_id.sql new file mode 100644 index 000000000..fdefcd53a --- /dev/null +++ b/codex-rs/state/migrations/0010_logs_process_id.sql @@ -0,0 +1,3 @@ +ALTER TABLE logs ADD COLUMN process_uuid TEXT; + +CREATE INDEX idx_logs_process_uuid ON logs(process_uuid); diff --git a/codex-rs/state/src/log_db.rs b/codex-rs/state/src/log_db.rs index 345e90c6a..64deeef2a 100644 --- a/codex-rs/state/src/log_db.rs +++ b/codex-rs/state/src/log_db.rs @@ -20,6 +20,7 @@ use chrono::Duration as ChronoDuration; use chrono::Utc; +use std::sync::OnceLock; use std::time::Duration; use std::time::SystemTime; use std::time::UNIX_EPOCH; @@ -33,6 +34,7 @@ use tracing::span::Id; use tracing::span::Record; use tracing_subscriber::Layer; use tracing_subscriber::registry::LookupSpan; +use uuid::Uuid; use crate::LogEntry; use crate::StateRuntime; @@ -44,14 +46,19 @@ const LOG_RETENTION_DAYS: i64 = 90; pub struct LogDbLayer { sender: mpsc::Sender, + process_uuid: String, } pub fn start(state_db: std::sync::Arc) -> LogDbLayer { + let process_uuid = current_process_log_uuid().to_string(); let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY); tokio::spawn(run_inserter(std::sync::Arc::clone(&state_db), receiver)); tokio::spawn(run_retention_cleanup(state_db)); - LogDbLayer { sender } + LogDbLayer { + sender, + process_uuid, + } } impl Layer for LogDbLayer @@ -118,6 +125,7 @@ where target: metadata.target().to_string(), message: visitor.message, thread_id, + process_uuid: Some(self.process_uuid.clone()), module_path: metadata.module_path().map(ToString::to_string), file: metadata.file().map(ToString::to_string), line: metadata.line().map(|line| line as i64), @@ -196,6 +204,15 @@ where thread_id } +fn current_process_log_uuid() -> &'static str { + static PROCESS_LOG_UUID: OnceLock = OnceLock::new(); + PROCESS_LOG_UUID.get_or_init(|| { + let pid = std::process::id(); + let process_uuid = Uuid::new_v4(); + format!("pid:{pid}:{process_uuid}") + }) +} + async fn run_inserter( state_db: std::sync::Arc, mut receiver: mpsc::Receiver, diff --git a/codex-rs/state/src/model/log.rs b/codex-rs/state/src/model/log.rs index 819abb5d2..2aeaf2d5c 100644 --- a/codex-rs/state/src/model/log.rs +++ b/codex-rs/state/src/model/log.rs @@ -9,6 +9,7 @@ pub struct LogEntry { pub target: String, pub message: Option, pub thread_id: Option, + pub process_uuid: Option, pub module_path: Option, pub file: Option, pub line: Option, @@ -23,6 +24,7 @@ pub struct LogRow { pub target: String, pub message: Option, pub thread_id: Option, + pub process_uuid: Option, pub file: Option, pub line: Option, } diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 4cc9c63b1..3a743a9c7 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -360,7 +360,7 @@ FROM threads } let mut builder = QueryBuilder::::new( - "INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, module_path, file, line) ", + "INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line) ", ); builder.push_values(entries, |mut row, entry| { row.push_bind(entry.ts) @@ -369,6 +369,7 @@ FROM threads .push_bind(&entry.target) .push_bind(&entry.message) .push_bind(&entry.thread_id) + .push_bind(&entry.process_uuid) .push_bind(&entry.module_path) .push_bind(&entry.file) .push_bind(entry.line); @@ -388,7 +389,7 @@ FROM threads /// Query logs with optional filters. pub async fn query_logs(&self, query: &LogQuery) -> anyhow::Result> { let mut builder = QueryBuilder::::new( - "SELECT id, ts, ts_nanos, level, target, message, thread_id, file, line FROM logs WHERE 1 = 1", + "SELECT id, ts, ts_nanos, level, target, message, thread_id, process_uuid, file, line FROM logs WHERE 1 = 1", ); push_log_filters(&mut builder, query); if query.descending {