From 6fbb89e8583d05ba00845e74593b39d5f66af5ce Mon Sep 17 00:00:00 2001 From: jif-oai Date: Wed, 14 Jan 2026 09:05:46 +0000 Subject: [PATCH] fix: shell snapshot clean-up (#9155) Clean all shell snapshot files corresponding to sessions that have not been updated in 7 days Those files should never leak. The only known cases were it can leak are during non graceful interrupt of the process (`kill -9, `panic`, OS crash, ...) --- codex-rs/core/src/codex.rs | 2 +- codex-rs/core/src/rollout/recorder.rs | 28 +++- codex-rs/core/src/shell_snapshot.rs | 201 +++++++++++++++++++++++++- 3 files changed, 215 insertions(+), 16 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 5cf79bd90..8d9ef9c44 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -687,7 +687,7 @@ impl Session { // Create the mutable state for the Session. if config.features.enabled(Feature::ShellSnapshot) { default_shell.shell_snapshot = - ShellSnapshot::try_new(&config.codex_home, &default_shell) + ShellSnapshot::try_new(&config.codex_home, conversation_id, &default_shell) .await .map(Arc::new); } diff --git a/codex-rs/core/src/rollout/recorder.rs b/codex-rs/core/src/rollout/recorder.rs index 80d95e625..dfafb2911 100644 --- a/codex-rs/core/src/rollout/recorder.rs +++ b/codex-rs/core/src/rollout/recorder.rs @@ -1,10 +1,12 @@ //! Persist Codex session rollouts (.jsonl) so sessions can be replayed or inspected later. use std::fs::File; +use std::fs::FileTimes; use std::fs::{self}; use std::io::Error as IoError; use std::path::Path; use std::path::PathBuf; +use std::time::SystemTime; use codex_protocol::ThreadId; use serde_json::Value; @@ -151,14 +153,17 @@ impl RolloutRecorder { }), ) } - RolloutRecorderParams::Resume { path } => ( - tokio::fs::OpenOptions::new() - .append(true) - .open(&path) - .await?, - path, - None, - ), + RolloutRecorderParams::Resume { path } => { + touch_rollout_file(&path)?; + ( + tokio::fs::OpenOptions::new() + .append(true) + .open(&path) + .await?, + path, + None, + ) + } }; // Clone the cwd for the spawned task to collect git info asynchronously @@ -343,6 +348,13 @@ fn create_log_file(config: &Config, conversation_id: ThreadId) -> std::io::Resul }) } +fn touch_rollout_file(path: &Path) -> std::io::Result<()> { + let file = fs::OpenOptions::new().append(true).open(path)?; + let times = FileTimes::new().set_modified(SystemTime::now()); + file.set_times(times)?; + Ok(()) +} + async fn rollout_writer( file: tokio::fs::File, mut rx: mpsc::Receiver, diff --git a/codex-rs/core/src/shell_snapshot.rs b/codex-rs/core/src/shell_snapshot.rs index f16e1f03d..9c10eedf4 100644 --- a/codex-rs/core/src/shell_snapshot.rs +++ b/codex-rs/core/src/shell_snapshot.rs @@ -1,7 +1,10 @@ +use std::io::ErrorKind; use std::path::Path; use std::path::PathBuf; use std::time::Duration; +use std::time::SystemTime; +use crate::rollout::list::find_thread_path_by_id_str; use crate::shell::Shell; use crate::shell::ShellType; use crate::shell::get_shell; @@ -9,10 +12,10 @@ use anyhow::Context; use anyhow::Result; use anyhow::anyhow; use anyhow::bail; +use codex_protocol::ThreadId; use tokio::fs; use tokio::process::Command; use tokio::time::timeout; -use uuid::Uuid; #[derive(Clone, Debug, PartialEq, Eq)] pub struct ShellSnapshot { @@ -20,17 +23,30 @@ pub struct ShellSnapshot { } const SNAPSHOT_TIMEOUT: Duration = Duration::from_secs(10); +const SNAPSHOT_RETENTION: Duration = Duration::from_secs(60 * 60 * 24 * 7); // 7 days retention. +const SNAPSHOT_DIR: &str = "shell_snapshots"; impl ShellSnapshot { - pub async fn try_new(codex_home: &Path, shell: &Shell) -> Option { + pub async fn try_new(codex_home: &Path, session_id: ThreadId, shell: &Shell) -> Option { + // File to store the snapshot let extension = match shell.shell_type { ShellType::PowerShell => "ps1", _ => "sh", }; - let path = - codex_home - .join("shell_snapshots") - .join(format!("{}.{}", Uuid::new_v4(), extension)); + let path = codex_home + .join(SNAPSHOT_DIR) + .join(format!("{session_id}.{extension}")); + + // Clean the (unlikely) leaked snapshot files. + let codex_home = codex_home.to_path_buf(); + let cleanup_session_id = session_id; + tokio::spawn(async move { + if let Err(err) = cleanup_stale_snapshots(&codex_home, cleanup_session_id).await { + tracing::warn!("Failed to clean up shell snapshots: {err:?}"); + } + }); + + // Make the new snapshot. match write_shell_snapshot(shell.shell_type.clone(), &path).await { Ok(path) => { tracing::info!("Shell snapshot successfully created: {}", path.display()); @@ -261,10 +277,82 @@ $envVars | ForEach-Object { "## } +/// Removes shell snapshots that either lack a matching session rollout file or +/// whose rollouts have not been updated within the retention window. +/// The active session id is exempt from cleanup. +pub async fn cleanup_stale_snapshots(codex_home: &Path, active_session_id: ThreadId) -> Result<()> { + let snapshot_dir = codex_home.join(SNAPSHOT_DIR); + + let mut entries = match fs::read_dir(&snapshot_dir).await { + Ok(entries) => entries, + Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()), + Err(err) => return Err(err.into()), + }; + + let now = SystemTime::now(); + let active_session_id = active_session_id.to_string(); + + while let Some(entry) = entries.next_entry().await? { + if !entry.file_type().await?.is_file() { + continue; + } + + let path = entry.path(); + + let file_name = entry.file_name(); + let file_name = file_name.to_string_lossy(); + let (session_id, _) = match file_name.rsplit_once('.') { + Some((stem, ext)) => (stem, ext), + None => { + remove_snapshot_file(&path).await; + continue; + } + }; + if session_id == active_session_id { + continue; + } + + let rollout_path = find_thread_path_by_id_str(codex_home, session_id).await?; + let Some(rollout_path) = rollout_path else { + remove_snapshot_file(&path).await; + continue; + }; + + let modified = match fs::metadata(&rollout_path).await.and_then(|m| m.modified()) { + Ok(modified) => modified, + Err(err) => { + tracing::warn!( + "Failed to check rollout age for snapshot {}: {err:?}", + path.display() + ); + continue; + } + }; + + if now + .duration_since(modified) + .ok() + .is_some_and(|age| age >= SNAPSHOT_RETENTION) + { + remove_snapshot_file(&path).await; + } + } + + Ok(()) +} + +async fn remove_snapshot_file(path: &Path) { + if let Err(err) = fs::remove_file(path).await { + tracing::warn!("Failed to delete shell snapshot at {:?}: {err:?}", path); + } +} + #[cfg(test)] mod tests { use super::*; use pretty_assertions::assert_eq; + #[cfg(unix)] + use std::os::unix::ffi::OsStrExt; #[cfg(target_os = "linux")] use std::os::unix::fs::PermissionsExt; #[cfg(target_os = "linux")] @@ -315,7 +403,7 @@ mod tests { shell_snapshot: None, }; - let snapshot = ShellSnapshot::try_new(dir.path(), &shell) + let snapshot = ShellSnapshot::try_new(dir.path(), ThreadId::new(), &shell) .await .expect("snapshot should be created"); let path = snapshot.path.clone(); @@ -425,4 +513,103 @@ mod tests { assert!(snapshot.contains("exports ")); Ok(()) } + + async fn write_rollout_stub(codex_home: &Path, session_id: ThreadId) -> Result { + let dir = codex_home + .join("sessions") + .join("2025") + .join("01") + .join("01"); + fs::create_dir_all(&dir).await?; + let path = dir.join(format!("rollout-2025-01-01T00-00-00-{session_id}.jsonl")); + fs::write(&path, "").await?; + Ok(path) + } + + #[tokio::test] + async fn cleanup_stale_snapshots_removes_orphans_and_keeps_live() -> Result<()> { + let dir = tempdir()?; + let codex_home = dir.path(); + let snapshot_dir = codex_home.join(SNAPSHOT_DIR); + fs::create_dir_all(&snapshot_dir).await?; + + let live_session = ThreadId::new(); + let orphan_session = ThreadId::new(); + let live_snapshot = snapshot_dir.join(format!("{live_session}.sh")); + let orphan_snapshot = snapshot_dir.join(format!("{orphan_session}.sh")); + let invalid_snapshot = snapshot_dir.join("not-a-snapshot.txt"); + + write_rollout_stub(codex_home, live_session).await?; + fs::write(&live_snapshot, "live").await?; + fs::write(&orphan_snapshot, "orphan").await?; + fs::write(&invalid_snapshot, "invalid").await?; + + cleanup_stale_snapshots(codex_home, ThreadId::new()).await?; + + assert_eq!(live_snapshot.exists(), true); + assert_eq!(orphan_snapshot.exists(), false); + assert_eq!(invalid_snapshot.exists(), false); + Ok(()) + } + + #[cfg(unix)] + #[tokio::test] + async fn cleanup_stale_snapshots_removes_stale_rollouts() -> Result<()> { + let dir = tempdir()?; + let codex_home = dir.path(); + let snapshot_dir = codex_home.join(SNAPSHOT_DIR); + fs::create_dir_all(&snapshot_dir).await?; + + let stale_session = ThreadId::new(); + let stale_snapshot = snapshot_dir.join(format!("{stale_session}.sh")); + let rollout_path = write_rollout_stub(codex_home, stale_session).await?; + fs::write(&stale_snapshot, "stale").await?; + + set_file_mtime(&rollout_path, SNAPSHOT_RETENTION + Duration::from_secs(60))?; + + cleanup_stale_snapshots(codex_home, ThreadId::new()).await?; + + assert_eq!(stale_snapshot.exists(), false); + Ok(()) + } + + #[cfg(unix)] + #[tokio::test] + async fn cleanup_stale_snapshots_skips_active_session() -> Result<()> { + let dir = tempdir()?; + let codex_home = dir.path(); + let snapshot_dir = codex_home.join(SNAPSHOT_DIR); + fs::create_dir_all(&snapshot_dir).await?; + + let active_session = ThreadId::new(); + let active_snapshot = snapshot_dir.join(format!("{active_session}.sh")); + let rollout_path = write_rollout_stub(codex_home, active_session).await?; + fs::write(&active_snapshot, "active").await?; + + set_file_mtime(&rollout_path, SNAPSHOT_RETENTION + Duration::from_secs(60))?; + + cleanup_stale_snapshots(codex_home, active_session).await?; + + assert_eq!(active_snapshot.exists(), true); + Ok(()) + } + + #[cfg(unix)] + fn set_file_mtime(path: &Path, age: Duration) -> Result<()> { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH)? + .as_secs() + .saturating_sub(age.as_secs()); + let tv_sec = now + .try_into() + .map_err(|_| anyhow!("Snapshot mtime is out of range for libc::timespec"))?; + let ts = libc::timespec { tv_sec, tv_nsec: 0 }; + let times = [ts, ts]; + let c_path = std::ffi::CString::new(path.as_os_str().as_bytes())?; + let result = unsafe { libc::utimensat(libc::AT_FDCWD, c_path.as_ptr(), times.as_ptr(), 0) }; + if result != 0 { + return Err(std::io::Error::last_os_error().into()); + } + Ok(()) + } }