Add process_uuid to sqlite logs (#11534)
## Summary This PR is the first slice of the per-session `/feedback` logging work: it adds a process-unique identifier to SQLite log rows. It does **not** change `/feedback` sourcing behavior yet. ## Changes - Add migration `0009_logs_process_id.sql` to extend `logs` with: - `process_uuid TEXT` - `idx_logs_process_uuid` index - Extend state log models: - `LogEntry.process_uuid: Option<String>` - `LogRow.process_uuid: Option<String>` - Stamp each log row with a stable per-process UUID in the sqlite log layer: - generated once per process as `pid:<pid>:<uuid>` - Update sqlite log insert/query paths to persist and read `process_uuid`: - `INSERT INTO logs (..., process_uuid, ...)` - `SELECT ..., process_uuid, ... FROM logs` ## Why App-server runs many sessions in one process. This change provides a process-scoping primitive we need for follow-up `/feedback` work, so threadless/process-level logs can be associated with the emitting process without mixing across processes. ## Non-goals in this PR - No `/feedback` transport/source changes - No attachment size changes - No sqlite retention/trim policy changes ## Testing - `just fmt` - CI will run the full checks
This commit is contained in:
parent
db6aa80195
commit
fce4ad9cf4
4 changed files with 26 additions and 3 deletions
3
codex-rs/state/migrations/0010_logs_process_id.sql
Normal file
3
codex-rs/state/migrations/0010_logs_process_id.sql
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
ALTER TABLE logs ADD COLUMN process_uuid TEXT;
|
||||
|
||||
CREATE INDEX idx_logs_process_uuid ON logs(process_uuid);
|
||||
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
use chrono::Duration as ChronoDuration;
|
||||
use chrono::Utc;
|
||||
use std::sync::OnceLock;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
use std::time::UNIX_EPOCH;
|
||||
|
|
@ -33,6 +34,7 @@ use tracing::span::Id;
|
|||
use tracing::span::Record;
|
||||
use tracing_subscriber::Layer;
|
||||
use tracing_subscriber::registry::LookupSpan;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::LogEntry;
|
||||
use crate::StateRuntime;
|
||||
|
|
@ -44,14 +46,19 @@ const LOG_RETENTION_DAYS: i64 = 90;
|
|||
|
||||
pub struct LogDbLayer {
|
||||
sender: mpsc::Sender<LogEntry>,
|
||||
process_uuid: String,
|
||||
}
|
||||
|
||||
pub fn start(state_db: std::sync::Arc<StateRuntime>) -> LogDbLayer {
|
||||
let process_uuid = current_process_log_uuid().to_string();
|
||||
let (sender, receiver) = mpsc::channel(LOG_QUEUE_CAPACITY);
|
||||
tokio::spawn(run_inserter(std::sync::Arc::clone(&state_db), receiver));
|
||||
tokio::spawn(run_retention_cleanup(state_db));
|
||||
|
||||
LogDbLayer { sender }
|
||||
LogDbLayer {
|
||||
sender,
|
||||
process_uuid,
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Layer<S> for LogDbLayer
|
||||
|
|
@ -118,6 +125,7 @@ where
|
|||
target: metadata.target().to_string(),
|
||||
message: visitor.message,
|
||||
thread_id,
|
||||
process_uuid: Some(self.process_uuid.clone()),
|
||||
module_path: metadata.module_path().map(ToString::to_string),
|
||||
file: metadata.file().map(ToString::to_string),
|
||||
line: metadata.line().map(|line| line as i64),
|
||||
|
|
@ -196,6 +204,15 @@ where
|
|||
thread_id
|
||||
}
|
||||
|
||||
fn current_process_log_uuid() -> &'static str {
|
||||
static PROCESS_LOG_UUID: OnceLock<String> = OnceLock::new();
|
||||
PROCESS_LOG_UUID.get_or_init(|| {
|
||||
let pid = std::process::id();
|
||||
let process_uuid = Uuid::new_v4();
|
||||
format!("pid:{pid}:{process_uuid}")
|
||||
})
|
||||
}
|
||||
|
||||
async fn run_inserter(
|
||||
state_db: std::sync::Arc<StateRuntime>,
|
||||
mut receiver: mpsc::Receiver<LogEntry>,
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ pub struct LogEntry {
|
|||
pub target: String,
|
||||
pub message: Option<String>,
|
||||
pub thread_id: Option<String>,
|
||||
pub process_uuid: Option<String>,
|
||||
pub module_path: Option<String>,
|
||||
pub file: Option<String>,
|
||||
pub line: Option<i64>,
|
||||
|
|
@ -23,6 +24,7 @@ pub struct LogRow {
|
|||
pub target: String,
|
||||
pub message: Option<String>,
|
||||
pub thread_id: Option<String>,
|
||||
pub process_uuid: Option<String>,
|
||||
pub file: Option<String>,
|
||||
pub line: Option<i64>,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -360,7 +360,7 @@ FROM threads
|
|||
}
|
||||
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
"INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, module_path, file, line) ",
|
||||
"INSERT INTO logs (ts, ts_nanos, level, target, message, thread_id, process_uuid, module_path, file, line) ",
|
||||
);
|
||||
builder.push_values(entries, |mut row, entry| {
|
||||
row.push_bind(entry.ts)
|
||||
|
|
@ -369,6 +369,7 @@ FROM threads
|
|||
.push_bind(&entry.target)
|
||||
.push_bind(&entry.message)
|
||||
.push_bind(&entry.thread_id)
|
||||
.push_bind(&entry.process_uuid)
|
||||
.push_bind(&entry.module_path)
|
||||
.push_bind(&entry.file)
|
||||
.push_bind(entry.line);
|
||||
|
|
@ -388,7 +389,7 @@ FROM threads
|
|||
/// Query logs with optional filters.
|
||||
pub async fn query_logs(&self, query: &LogQuery) -> anyhow::Result<Vec<LogRow>> {
|
||||
let mut builder = QueryBuilder::<Sqlite>::new(
|
||||
"SELECT id, ts, ts_nanos, level, target, message, thread_id, file, line FROM logs WHERE 1 = 1",
|
||||
"SELECT id, ts, ts_nanos, level, target, message, thread_id, process_uuid, file, line FROM logs WHERE 1 = 1",
|
||||
);
|
||||
push_log_filters(&mut builder, query);
|
||||
if query.descending {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue