fix: shell snapshot clean-up (#9155)
Clean all shell snapshot files corresponding to sessions that have not been updated in 7 days Those files should never leak. The only known cases were it can leak are during non graceful interrupt of the process (`kill -9, `panic`, OS crash, ...)
This commit is contained in:
parent
258fc4b401
commit
6fbb89e858
3 changed files with 215 additions and 16 deletions
|
|
@ -687,7 +687,7 @@ impl Session {
|
|||
// Create the mutable state for the Session.
|
||||
if config.features.enabled(Feature::ShellSnapshot) {
|
||||
default_shell.shell_snapshot =
|
||||
ShellSnapshot::try_new(&config.codex_home, &default_shell)
|
||||
ShellSnapshot::try_new(&config.codex_home, conversation_id, &default_shell)
|
||||
.await
|
||||
.map(Arc::new);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,10 +1,12 @@
|
|||
//! Persist Codex session rollouts (.jsonl) so sessions can be replayed or inspected later.
|
||||
|
||||
use std::fs::File;
|
||||
use std::fs::FileTimes;
|
||||
use std::fs::{self};
|
||||
use std::io::Error as IoError;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use codex_protocol::ThreadId;
|
||||
use serde_json::Value;
|
||||
|
|
@ -151,14 +153,17 @@ impl RolloutRecorder {
|
|||
}),
|
||||
)
|
||||
}
|
||||
RolloutRecorderParams::Resume { path } => (
|
||||
tokio::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&path)
|
||||
.await?,
|
||||
path,
|
||||
None,
|
||||
),
|
||||
RolloutRecorderParams::Resume { path } => {
|
||||
touch_rollout_file(&path)?;
|
||||
(
|
||||
tokio::fs::OpenOptions::new()
|
||||
.append(true)
|
||||
.open(&path)
|
||||
.await?,
|
||||
path,
|
||||
None,
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
// Clone the cwd for the spawned task to collect git info asynchronously
|
||||
|
|
@ -343,6 +348,13 @@ fn create_log_file(config: &Config, conversation_id: ThreadId) -> std::io::Resul
|
|||
})
|
||||
}
|
||||
|
||||
fn touch_rollout_file(path: &Path) -> std::io::Result<()> {
|
||||
let file = fs::OpenOptions::new().append(true).open(path)?;
|
||||
let times = FileTimes::new().set_modified(SystemTime::now());
|
||||
file.set_times(times)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn rollout_writer(
|
||||
file: tokio::fs::File,
|
||||
mut rx: mpsc::Receiver<RolloutCmd>,
|
||||
|
|
|
|||
|
|
@ -1,7 +1,10 @@
|
|||
use std::io::ErrorKind;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
use std::time::SystemTime;
|
||||
|
||||
use crate::rollout::list::find_thread_path_by_id_str;
|
||||
use crate::shell::Shell;
|
||||
use crate::shell::ShellType;
|
||||
use crate::shell::get_shell;
|
||||
|
|
@ -9,10 +12,10 @@ use anyhow::Context;
|
|||
use anyhow::Result;
|
||||
use anyhow::anyhow;
|
||||
use anyhow::bail;
|
||||
use codex_protocol::ThreadId;
|
||||
use tokio::fs;
|
||||
use tokio::process::Command;
|
||||
use tokio::time::timeout;
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct ShellSnapshot {
|
||||
|
|
@ -20,17 +23,30 @@ pub struct ShellSnapshot {
|
|||
}
|
||||
|
||||
const SNAPSHOT_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
const SNAPSHOT_RETENTION: Duration = Duration::from_secs(60 * 60 * 24 * 7); // 7 days retention.
|
||||
const SNAPSHOT_DIR: &str = "shell_snapshots";
|
||||
|
||||
impl ShellSnapshot {
|
||||
pub async fn try_new(codex_home: &Path, shell: &Shell) -> Option<Self> {
|
||||
pub async fn try_new(codex_home: &Path, session_id: ThreadId, shell: &Shell) -> Option<Self> {
|
||||
// File to store the snapshot
|
||||
let extension = match shell.shell_type {
|
||||
ShellType::PowerShell => "ps1",
|
||||
_ => "sh",
|
||||
};
|
||||
let path =
|
||||
codex_home
|
||||
.join("shell_snapshots")
|
||||
.join(format!("{}.{}", Uuid::new_v4(), extension));
|
||||
let path = codex_home
|
||||
.join(SNAPSHOT_DIR)
|
||||
.join(format!("{session_id}.{extension}"));
|
||||
|
||||
// Clean the (unlikely) leaked snapshot files.
|
||||
let codex_home = codex_home.to_path_buf();
|
||||
let cleanup_session_id = session_id;
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = cleanup_stale_snapshots(&codex_home, cleanup_session_id).await {
|
||||
tracing::warn!("Failed to clean up shell snapshots: {err:?}");
|
||||
}
|
||||
});
|
||||
|
||||
// Make the new snapshot.
|
||||
match write_shell_snapshot(shell.shell_type.clone(), &path).await {
|
||||
Ok(path) => {
|
||||
tracing::info!("Shell snapshot successfully created: {}", path.display());
|
||||
|
|
@ -261,10 +277,82 @@ $envVars | ForEach-Object {
|
|||
"##
|
||||
}
|
||||
|
||||
/// Removes shell snapshots that either lack a matching session rollout file or
|
||||
/// whose rollouts have not been updated within the retention window.
|
||||
/// The active session id is exempt from cleanup.
|
||||
pub async fn cleanup_stale_snapshots(codex_home: &Path, active_session_id: ThreadId) -> Result<()> {
|
||||
let snapshot_dir = codex_home.join(SNAPSHOT_DIR);
|
||||
|
||||
let mut entries = match fs::read_dir(&snapshot_dir).await {
|
||||
Ok(entries) => entries,
|
||||
Err(err) if err.kind() == ErrorKind::NotFound => return Ok(()),
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
|
||||
let now = SystemTime::now();
|
||||
let active_session_id = active_session_id.to_string();
|
||||
|
||||
while let Some(entry) = entries.next_entry().await? {
|
||||
if !entry.file_type().await?.is_file() {
|
||||
continue;
|
||||
}
|
||||
|
||||
let path = entry.path();
|
||||
|
||||
let file_name = entry.file_name();
|
||||
let file_name = file_name.to_string_lossy();
|
||||
let (session_id, _) = match file_name.rsplit_once('.') {
|
||||
Some((stem, ext)) => (stem, ext),
|
||||
None => {
|
||||
remove_snapshot_file(&path).await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if session_id == active_session_id {
|
||||
continue;
|
||||
}
|
||||
|
||||
let rollout_path = find_thread_path_by_id_str(codex_home, session_id).await?;
|
||||
let Some(rollout_path) = rollout_path else {
|
||||
remove_snapshot_file(&path).await;
|
||||
continue;
|
||||
};
|
||||
|
||||
let modified = match fs::metadata(&rollout_path).await.and_then(|m| m.modified()) {
|
||||
Ok(modified) => modified,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
"Failed to check rollout age for snapshot {}: {err:?}",
|
||||
path.display()
|
||||
);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if now
|
||||
.duration_since(modified)
|
||||
.ok()
|
||||
.is_some_and(|age| age >= SNAPSHOT_RETENTION)
|
||||
{
|
||||
remove_snapshot_file(&path).await;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_snapshot_file(path: &Path) {
|
||||
if let Err(err) = fs::remove_file(path).await {
|
||||
tracing::warn!("Failed to delete shell snapshot at {:?}: {err:?}", path);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use pretty_assertions::assert_eq;
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::ffi::OsStrExt;
|
||||
#[cfg(target_os = "linux")]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
#[cfg(target_os = "linux")]
|
||||
|
|
@ -315,7 +403,7 @@ mod tests {
|
|||
shell_snapshot: None,
|
||||
};
|
||||
|
||||
let snapshot = ShellSnapshot::try_new(dir.path(), &shell)
|
||||
let snapshot = ShellSnapshot::try_new(dir.path(), ThreadId::new(), &shell)
|
||||
.await
|
||||
.expect("snapshot should be created");
|
||||
let path = snapshot.path.clone();
|
||||
|
|
@ -425,4 +513,103 @@ mod tests {
|
|||
assert!(snapshot.contains("exports "));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_rollout_stub(codex_home: &Path, session_id: ThreadId) -> Result<PathBuf> {
|
||||
let dir = codex_home
|
||||
.join("sessions")
|
||||
.join("2025")
|
||||
.join("01")
|
||||
.join("01");
|
||||
fs::create_dir_all(&dir).await?;
|
||||
let path = dir.join(format!("rollout-2025-01-01T00-00-00-{session_id}.jsonl"));
|
||||
fs::write(&path, "").await?;
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cleanup_stale_snapshots_removes_orphans_and_keeps_live() -> Result<()> {
|
||||
let dir = tempdir()?;
|
||||
let codex_home = dir.path();
|
||||
let snapshot_dir = codex_home.join(SNAPSHOT_DIR);
|
||||
fs::create_dir_all(&snapshot_dir).await?;
|
||||
|
||||
let live_session = ThreadId::new();
|
||||
let orphan_session = ThreadId::new();
|
||||
let live_snapshot = snapshot_dir.join(format!("{live_session}.sh"));
|
||||
let orphan_snapshot = snapshot_dir.join(format!("{orphan_session}.sh"));
|
||||
let invalid_snapshot = snapshot_dir.join("not-a-snapshot.txt");
|
||||
|
||||
write_rollout_stub(codex_home, live_session).await?;
|
||||
fs::write(&live_snapshot, "live").await?;
|
||||
fs::write(&orphan_snapshot, "orphan").await?;
|
||||
fs::write(&invalid_snapshot, "invalid").await?;
|
||||
|
||||
cleanup_stale_snapshots(codex_home, ThreadId::new()).await?;
|
||||
|
||||
assert_eq!(live_snapshot.exists(), true);
|
||||
assert_eq!(orphan_snapshot.exists(), false);
|
||||
assert_eq!(invalid_snapshot.exists(), false);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
async fn cleanup_stale_snapshots_removes_stale_rollouts() -> Result<()> {
|
||||
let dir = tempdir()?;
|
||||
let codex_home = dir.path();
|
||||
let snapshot_dir = codex_home.join(SNAPSHOT_DIR);
|
||||
fs::create_dir_all(&snapshot_dir).await?;
|
||||
|
||||
let stale_session = ThreadId::new();
|
||||
let stale_snapshot = snapshot_dir.join(format!("{stale_session}.sh"));
|
||||
let rollout_path = write_rollout_stub(codex_home, stale_session).await?;
|
||||
fs::write(&stale_snapshot, "stale").await?;
|
||||
|
||||
set_file_mtime(&rollout_path, SNAPSHOT_RETENTION + Duration::from_secs(60))?;
|
||||
|
||||
cleanup_stale_snapshots(codex_home, ThreadId::new()).await?;
|
||||
|
||||
assert_eq!(stale_snapshot.exists(), false);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[tokio::test]
|
||||
async fn cleanup_stale_snapshots_skips_active_session() -> Result<()> {
|
||||
let dir = tempdir()?;
|
||||
let codex_home = dir.path();
|
||||
let snapshot_dir = codex_home.join(SNAPSHOT_DIR);
|
||||
fs::create_dir_all(&snapshot_dir).await?;
|
||||
|
||||
let active_session = ThreadId::new();
|
||||
let active_snapshot = snapshot_dir.join(format!("{active_session}.sh"));
|
||||
let rollout_path = write_rollout_stub(codex_home, active_session).await?;
|
||||
fs::write(&active_snapshot, "active").await?;
|
||||
|
||||
set_file_mtime(&rollout_path, SNAPSHOT_RETENTION + Duration::from_secs(60))?;
|
||||
|
||||
cleanup_stale_snapshots(codex_home, active_session).await?;
|
||||
|
||||
assert_eq!(active_snapshot.exists(), true);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
fn set_file_mtime(path: &Path, age: Duration) -> Result<()> {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)?
|
||||
.as_secs()
|
||||
.saturating_sub(age.as_secs());
|
||||
let tv_sec = now
|
||||
.try_into()
|
||||
.map_err(|_| anyhow!("Snapshot mtime is out of range for libc::timespec"))?;
|
||||
let ts = libc::timespec { tv_sec, tv_nsec: 0 };
|
||||
let times = [ts, ts];
|
||||
let c_path = std::ffi::CString::new(path.as_os_str().as_bytes())?;
|
||||
let result = unsafe { libc::utimensat(libc::AT_FDCWD, c_path.as_ptr(), times.as_ptr(), 0) };
|
||||
if result != 0 {
|
||||
return Err(std::io::Error::last_os_error().into());
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue