From 2c9be54c9a1d1229d7923f2ad8cd557681746fc4 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 10 Feb 2026 23:22:55 +0000 Subject: [PATCH] feat: mem v2 - PR5 (#11372) --- codex-rs/core/src/memories/layout.rs | 83 -------------- codex-rs/core/src/memories/mod.rs | 38 ++++++- codex-rs/core/src/memories/stage_one.rs | 10 +- .../core/src/memories/startup/dispatch.rs | 4 +- codex-rs/core/src/memories/startup/extract.rs | 2 +- codex-rs/core/src/memories/startup/mod.rs | 21 +--- .../memories/startup/{watch.rs => phase2.rs} | 0 codex-rs/core/src/memories/storage.rs | 31 +++--- codex-rs/core/src/memories/tests.rs | 55 ++------- codex-rs/core/src/memories/types.rs | 15 --- .../core/templates/memories/consolidation.md | 2 +- .../templates/memories/stage_one_input.md | 2 +- .../templates/memories/stage_one_system.md | 1 - ...d_stage1_outputs.sql => 0006_memories.sql} | 16 +-- .../state/migrations/0006_thread_memory.sql | 9 -- .../0009_memory_consolidation_locks.sql | 8 -- .../migrations/0010_memory_workflow_v2.sql | 85 -------------- codex-rs/state/src/lib.rs | 8 +- codex-rs/state/src/model/memories.rs | 105 ++++++++++++++++++ codex-rs/state/src/model/mod.rs | 10 +- codex-rs/state/src/model/stage1_output.rs | 56 ---------- codex-rs/state/src/runtime.rs | 61 ++-------- .../src/runtime/{memory.rs => memories.rs} | 19 ++-- 23 files changed, 208 insertions(+), 433 deletions(-) delete mode 100644 codex-rs/core/src/memories/layout.rs rename codex-rs/core/src/memories/startup/{watch.rs => phase2.rs} (100%) delete mode 100644 codex-rs/core/src/memories/types.rs rename codex-rs/state/migrations/{0011_generic_jobs_and_stage1_outputs.sql => 0006_memories.sql} (61%) delete mode 100644 codex-rs/state/migrations/0006_thread_memory.sql delete mode 100644 codex-rs/state/migrations/0009_memory_consolidation_locks.sql delete mode 100644 codex-rs/state/migrations/0010_memory_workflow_v2.sql create mode 100644 codex-rs/state/src/model/memories.rs delete mode 100644 codex-rs/state/src/model/stage1_output.rs rename codex-rs/state/src/runtime/{memory.rs => memories.rs} (97%) diff --git a/codex-rs/core/src/memories/layout.rs b/codex-rs/core/src/memories/layout.rs deleted file mode 100644 index 07a8bd4ec..000000000 --- a/codex-rs/core/src/memories/layout.rs +++ /dev/null @@ -1,83 +0,0 @@ -use std::path::Path; -use std::path::PathBuf; - -pub(super) const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries"; -pub(super) const RAW_MEMORIES_FILENAME: &str = "raw_memories.md"; -pub(super) const MEMORY_REGISTRY_FILENAME: &str = "MEMORY.md"; -pub(super) const LEGACY_CONSOLIDATED_FILENAME: &str = "consolidated.md"; -pub(super) const SKILLS_SUBDIR: &str = "skills"; -const LEGACY_USER_SUBDIR: &str = "user"; -const LEGACY_MEMORY_SUBDIR: &str = "memory"; - -/// Returns the shared on-disk memory root directory. -pub(super) fn memory_root(codex_home: &Path) -> PathBuf { - codex_home.join("memories") -} - -pub(super) fn rollout_summaries_dir(root: &Path) -> PathBuf { - root.join(ROLLOUT_SUMMARIES_SUBDIR) -} - -pub(super) fn raw_memories_file(root: &Path) -> PathBuf { - root.join(RAW_MEMORIES_FILENAME) -} - -/// Migrates legacy user memory contents into the shared root when no shared-root -/// phase artifacts exist yet. -pub(super) async fn migrate_legacy_user_memory_root_if_needed( - codex_home: &Path, -) -> std::io::Result<()> { - let root = memory_root(codex_home); - let legacy = legacy_user_memory_root(codex_home); - - if !tokio::fs::try_exists(&legacy).await? || global_root_has_phase_artifacts(&root).await? { - return Ok(()); - } - - copy_dir_contents_if_missing(&legacy, &root).await -} - -/// Ensures the phase-1 memory directory layout exists for the given root. -pub(super) async fn ensure_layout(root: &Path) -> std::io::Result<()> { - tokio::fs::create_dir_all(rollout_summaries_dir(root)).await -} - -fn legacy_user_memory_root(codex_home: &Path) -> PathBuf { - codex_home - .join("memories") - .join(LEGACY_USER_SUBDIR) - .join(LEGACY_MEMORY_SUBDIR) -} - -async fn global_root_has_phase_artifacts(root: &Path) -> std::io::Result { - if tokio::fs::try_exists(&rollout_summaries_dir(root)).await? - || tokio::fs::try_exists(&raw_memories_file(root)).await? - || tokio::fs::try_exists(&root.join(MEMORY_REGISTRY_FILENAME)).await? - || tokio::fs::try_exists(&root.join(LEGACY_CONSOLIDATED_FILENAME)).await? - || tokio::fs::try_exists(&root.join(SKILLS_SUBDIR)).await? - { - return Ok(true); - } - Ok(false) -} - -fn copy_dir_contents_if_missing<'a>( - src_dir: &'a Path, - dst_dir: &'a Path, -) -> futures::future::BoxFuture<'a, std::io::Result<()>> { - Box::pin(async move { - tokio::fs::create_dir_all(dst_dir).await?; - let mut dir = tokio::fs::read_dir(src_dir).await?; - while let Some(entry) = dir.next_entry().await? { - let src_path = entry.path(); - let dst_path = dst_dir.join(entry.file_name()); - let metadata = entry.metadata().await?; - if metadata.is_dir() { - copy_dir_contents_if_missing(&src_path, &dst_path).await?; - } else if metadata.is_file() && !tokio::fs::try_exists(&dst_path).await? { - tokio::fs::copy(&src_path, &dst_path).await?; - } - } - Ok(()) - }) -} diff --git a/codex-rs/core/src/memories/mod.rs b/codex-rs/core/src/memories/mod.rs index cfda3f47e..10c50406a 100644 --- a/codex-rs/core/src/memories/mod.rs +++ b/codex-rs/core/src/memories/mod.rs @@ -4,20 +4,26 @@ //! - Phase 1: select rollouts, extract stage-1 raw memories, persist stage-1 outputs, and enqueue consolidation. //! - Phase 2: claim a global consolidation lock, materialize consolidation inputs, and dispatch one consolidation agent. -mod layout; mod prompts; mod rollout; mod stage_one; mod startup; mod storage; mod text; -mod types; #[cfg(test)] mod tests; +use serde::Deserialize; +use std::path::Path; +use std::path::PathBuf; + /// Subagent source label used to identify consolidation tasks. const MEMORY_CONSOLIDATION_SUBAGENT_LABEL: &str = "memory_consolidation"; +const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries"; +const RAW_MEMORIES_FILENAME: &str = "raw_memories.md"; +const MEMORY_REGISTRY_FILENAME: &str = "MEMORY.md"; +const SKILLS_SUBDIR: &str = "skills"; /// Maximum number of rollout candidates processed per startup pass. const MAX_ROLLOUTS_PER_STARTUP: usize = 64; /// Concurrency cap for startup memory extraction and consolidation scheduling. @@ -39,6 +45,34 @@ const PHASE_TWO_JOB_RETRY_DELAY_SECONDS: i64 = 3_600; /// Heartbeat interval (seconds) for phase-2 running jobs. const PHASE_TWO_JOB_HEARTBEAT_SECONDS: u64 = 30; +/// Parsed stage-1 model output payload. +#[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] +struct StageOneOutput { + /// Detailed markdown raw memory for a single rollout. + #[serde(rename = "raw_memory")] + raw_memory: String, + /// Compact summary line used for routing and indexing. + #[serde(rename = "rollout_summary")] + rollout_summary: String, +} + +fn memory_root(codex_home: &Path) -> PathBuf { + codex_home.join("memories") +} + +fn rollout_summaries_dir(root: &Path) -> PathBuf { + root.join(ROLLOUT_SUMMARIES_SUBDIR) +} + +fn raw_memories_file(root: &Path) -> PathBuf { + root.join(RAW_MEMORIES_FILENAME) +} + +async fn ensure_layout(root: &Path) -> std::io::Result<()> { + tokio::fs::create_dir_all(rollout_summaries_dir(root)).await +} + /// Starts the memory startup pipeline for eligible root sessions. /// /// This is the single entrypoint that `codex` uses to trigger memory startup. diff --git a/codex-rs/core/src/memories/stage_one.rs b/codex-rs/core/src/memories/stage_one.rs index c9f2f650e..7f73fc262 100644 --- a/codex-rs/core/src/memories/stage_one.rs +++ b/codex-rs/core/src/memories/stage_one.rs @@ -5,9 +5,9 @@ use regex::Regex; use serde_json::Value; use serde_json::json; +use super::StageOneOutput; use super::text::compact_whitespace; use super::text::truncate_text_for_storage; -use super::types::StageOneOutput; /// System prompt for stage-1 raw memory extraction. pub(super) const RAW_MEMORY_PROMPT: &str = @@ -28,7 +28,6 @@ pub(super) fn stage_one_output_schema() -> Value { json!({ "type": "object", "properties": { - "rollout_slug": { "type": "string" }, "rollout_summary": { "type": "string" }, "raw_memory": { "type": "string" } }, @@ -97,12 +96,6 @@ fn parse_json_object_loose(raw: &str) -> Result { fn normalize_stage_one_output(mut output: StageOneOutput) -> Result { output.raw_memory = output.raw_memory.trim().to_string(); output.rollout_summary = output.rollout_summary.trim().to_string(); - if let Some(slug) = output.rollout_slug.take() { - let slug = slug.trim(); - if !slug.is_empty() { - output.rollout_slug = Some(slug.to_string()); - } - } if output.raw_memory.is_empty() { return Err(CodexErr::InvalidRequest( @@ -195,7 +188,6 @@ mod tests { fn normalize_stage_one_output_redacts_and_compacts_summary() { let output = StageOneOutput { raw_memory: "Token: sk-abcdefghijklmnopqrstuvwxyz123456\nBearer abcdefghijklmnopqrstuvwxyz012345".to_string(), - rollout_slug: None, rollout_summary: "password = mysecret123456\n\nsmall".to_string(), }; diff --git a/codex-rs/core/src/memories/startup/dispatch.rs b/codex-rs/core/src/memories/startup/dispatch.rs index c18b9b77d..b3b0ad26e 100644 --- a/codex-rs/core/src/memories/startup/dispatch.rs +++ b/codex-rs/core/src/memories/startup/dispatch.rs @@ -1,6 +1,6 @@ use crate::codex::Session; use crate::config::Config; -use crate::memories::layout::memory_root; +use crate::memories::memory_root; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use codex_protocol::user_input::UserInput; @@ -17,7 +17,7 @@ use super::super::prompts::build_consolidation_prompt; use super::super::storage::rebuild_raw_memories_file_from_memories; use super::super::storage::sync_rollout_summaries_from_memories; use super::super::storage::wipe_consolidation_outputs; -use super::watch::spawn_phase2_completion_task; +use super::phase2::spawn_phase2_completion_task; pub(super) async fn run_global_memory_consolidation( session: &Arc, diff --git a/codex-rs/core/src/memories/startup/extract.rs b/codex-rs/core/src/memories/startup/extract.rs index dd5f25854..d4e9c2312 100644 --- a/codex-rs/core/src/memories/startup/extract.rs +++ b/codex-rs/core/src/memories/startup/extract.rs @@ -12,13 +12,13 @@ use futures::StreamExt; use tracing::warn; use super::StageOneRequestContext; +use crate::memories::StageOneOutput; use crate::memories::prompts::build_stage_one_input_message; use crate::memories::rollout::StageOneRolloutFilter; use crate::memories::rollout::serialize_filtered_rollout_response_items; use crate::memories::stage_one::RAW_MEMORY_PROMPT; use crate::memories::stage_one::parse_stage_one_output; use crate::memories::stage_one::stage_one_output_schema; -use crate::memories::types::StageOneOutput; use std::path::Path; pub(super) async fn extract_stage_one_output( diff --git a/codex-rs/core/src/memories/startup/mod.rs b/codex-rs/core/src/memories/startup/mod.rs index 985d4d275..3608defa1 100644 --- a/codex-rs/core/src/memories/startup/mod.rs +++ b/codex-rs/core/src/memories/startup/mod.rs @@ -1,13 +1,12 @@ mod dispatch; mod extract; -mod watch; +mod phase2; use crate::codex::Session; use crate::codex::TurnContext; use crate::config::Config; use crate::error::Result as CodexResult; use crate::features::Feature; -use crate::memories::layout::migrate_legacy_user_memory_root_if_needed; use crate::rollout::INTERACTIVE_SESSION_SOURCES; use codex_otel::OtelManager; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; @@ -15,7 +14,6 @@ use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::protocol::SessionSource; use futures::StreamExt; -use serde_json::Value; use std::sync::Arc; use tracing::info; use tracing::warn; @@ -80,10 +78,6 @@ pub(super) async fn run_memories_startup_pipeline( session: &Arc, config: Arc, ) -> CodexResult<()> { - if let Err(err) = migrate_legacy_user_memory_root_if_needed(&config.codex_home).await { - warn!("failed migrating legacy shared memory root: {err}"); - } - let Some(state_db) = session.services.state_db.as_deref() else { warn!("state db unavailable for memories startup pipeline; skipping"); return Ok(()); @@ -91,11 +85,7 @@ pub(super) async fn run_memories_startup_pipeline( let allowed_sources = INTERACTIVE_SESSION_SOURCES .iter() - .map(|value| match serde_json::to_value(value) { - Ok(Value::String(s)) => s, - Ok(other) => other.to_string(), - Err(_) => String::new(), - }) + .map(ToString::to_string) .collect::>(); let claimed_candidates = match state_db @@ -186,7 +176,8 @@ pub(super) async fn run_memories_startup_pipeline( claimed_count, succeeded_count ); - let consolidation_job_count = run_consolidation_dispatch(session, config).await; + let consolidation_job_count = + usize::from(dispatch::run_global_memory_consolidation(session, config).await); info!( "memory consolidation dispatch complete: {} job(s) scheduled", consolidation_job_count @@ -194,7 +185,3 @@ pub(super) async fn run_memories_startup_pipeline( Ok(()) } - -async fn run_consolidation_dispatch(session: &Arc, config: Arc) -> usize { - usize::from(dispatch::run_global_memory_consolidation(session, config).await) -} diff --git a/codex-rs/core/src/memories/startup/watch.rs b/codex-rs/core/src/memories/startup/phase2.rs similarity index 100% rename from codex-rs/core/src/memories/startup/watch.rs rename to codex-rs/core/src/memories/startup/phase2.rs diff --git a/codex-rs/core/src/memories/storage.rs b/codex-rs/core/src/memories/storage.rs index 049130a5c..aa14edf66 100644 --- a/codex-rs/core/src/memories/storage.rs +++ b/codex-rs/core/src/memories/storage.rs @@ -5,13 +5,12 @@ use std::path::Path; use tracing::warn; use super::MAX_RAW_MEMORIES_FOR_GLOBAL; +use super::MEMORY_REGISTRY_FILENAME; +use super::SKILLS_SUBDIR; +use super::ensure_layout; +use super::raw_memories_file; +use super::rollout_summaries_dir; use super::text::compact_whitespace; -use crate::memories::layout::LEGACY_CONSOLIDATED_FILENAME; -use crate::memories::layout::MEMORY_REGISTRY_FILENAME; -use crate::memories::layout::SKILLS_SUBDIR; -use crate::memories::layout::ensure_layout; -use crate::memories::layout::raw_memories_file; -use crate::memories::layout::rollout_summaries_dir; /// Rebuild `raw_memories.md` from DB-backed stage-1 outputs. pub(super) async fn rebuild_raw_memories_file_from_memories( @@ -49,16 +48,14 @@ pub(super) async fn sync_rollout_summaries_from_memories( /// /// Phase-1 artifacts (`rollout_summaries/` and `raw_memories.md`) are preserved. pub(super) async fn wipe_consolidation_outputs(root: &Path) -> std::io::Result<()> { - for file_name in [MEMORY_REGISTRY_FILENAME, LEGACY_CONSOLIDATED_FILENAME] { - let path = root.join(file_name); - if let Err(err) = tokio::fs::remove_file(&path).await - && err.kind() != std::io::ErrorKind::NotFound - { - warn!( - "failed removing consolidation file {}: {err}", - path.display() - ); - } + let path = root.join(MEMORY_REGISTRY_FILENAME); + if let Err(err) = tokio::fs::remove_file(&path).await + && err.kind() != std::io::ErrorKind::NotFound + { + warn!( + "failed removing consolidation file {}: {err}", + path.display() + ); } let skills_dir = root.join(SKILLS_SUBDIR); @@ -152,7 +149,7 @@ async fn write_rollout_summary_for_thread( .map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?; writeln!(body) .map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?; - body.push_str(&compact_whitespace(&memory.summary)); + body.push_str(&compact_whitespace(&memory.rollout_summary)); body.push('\n'); tokio::fs::write(path, body).await diff --git a/codex-rs/core/src/memories/tests.rs b/codex-rs/core/src/memories/tests.rs index 4de3fccba..489951bae 100644 --- a/codex-rs/core/src/memories/tests.rs +++ b/codex-rs/core/src/memories/tests.rs @@ -5,11 +5,10 @@ use super::stage_one::parse_stage_one_output; use super::storage::rebuild_raw_memories_file_from_memories; use super::storage::sync_rollout_summaries_from_memories; use super::storage::wipe_consolidation_outputs; -use crate::memories::layout::ensure_layout; -use crate::memories::layout::memory_root; -use crate::memories::layout::migrate_legacy_user_memory_root_if_needed; -use crate::memories::layout::raw_memories_file; -use crate::memories::layout::rollout_summaries_dir; +use crate::memories::ensure_layout; +use crate::memories::memory_root; +use crate::memories::raw_memories_file; +use crate::memories::rollout_summaries_dir; use chrono::TimeZone; use chrono::Utc; use codex_protocol::ThreadId; @@ -28,49 +27,18 @@ fn memory_root_uses_shared_global_path() { assert_eq!(memory_root(&codex_home), codex_home.join("memories")); } -#[tokio::test] -async fn migrate_legacy_user_memory_root_if_needed_copies_contents() { - let dir = tempdir().expect("tempdir"); - let codex_home = dir.path().join("codex"); - let legacy_root = codex_home.join("memories").join("user").join("memory"); - tokio::fs::create_dir_all(legacy_root.join("rollout_summaries")) - .await - .expect("create legacy rollout summaries dir"); - tokio::fs::write( - legacy_root.join("rollout_summaries").join("thread.md"), - "summary", - ) - .await - .expect("write legacy rollout summary"); - tokio::fs::write(legacy_root.join("raw_memories.md"), "raw") - .await - .expect("write legacy raw memories"); - - migrate_legacy_user_memory_root_if_needed(&codex_home) - .await - .expect("migrate legacy memory root"); - - let root = memory_root(&codex_home); - assert!(root.join("rollout_summaries").join("thread.md").is_file()); - assert!(root.join("raw_memories.md").is_file()); -} - #[test] fn parse_stage_one_output_accepts_fenced_json() { - let raw = "```json\n{\"raw_memory\":\"abc\",\"rollout_summary\":\"short\",\"rollout_slug\":\"slug\"}\n```"; + let raw = "```json\n{\"raw_memory\":\"abc\",\"rollout_summary\":\"short\"}\n```"; let parsed = parse_stage_one_output(raw).expect("parsed"); assert!(parsed.raw_memory.contains("abc")); assert_eq!(parsed.rollout_summary, "short"); - assert_eq!(parsed.rollout_slug, Some("slug".to_string())); } #[test] -fn parse_stage_one_output_accepts_legacy_keys() { +fn parse_stage_one_output_rejects_legacy_keys() { let raw = r#"{"rawMemory":"abc","summary":"short"}"#; - let parsed = parse_stage_one_output(raw).expect("parsed"); - assert!(parsed.raw_memory.contains("abc")); - assert_eq!(parsed.rollout_summary, "short"); - assert_eq!(parsed.rollout_slug, None); + assert!(parse_stage_one_output(raw).is_err()); } #[test] @@ -194,7 +162,7 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only thread_id: ThreadId::try_from(keep_id.clone()).expect("thread id"), source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"), raw_memory: "raw memory".to_string(), - summary: "short summary".to_string(), + rollout_summary: "short summary".to_string(), generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"), }]; @@ -216,13 +184,12 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only } #[tokio::test] -async fn wipe_consolidation_outputs_removes_registry_skills_and_legacy_file() { +async fn wipe_consolidation_outputs_removes_registry_and_skills() { let dir = tempdir().expect("tempdir"); let root = dir.path().join("memory"); ensure_layout(&root).await.expect("ensure layout"); let memory_registry = root.join("MEMORY.md"); - let legacy_consolidated = root.join("consolidated.md"); let skills_dir = root.join("skills").join("example"); tokio::fs::create_dir_all(&skills_dir) @@ -231,15 +198,11 @@ async fn wipe_consolidation_outputs_removes_registry_skills_and_legacy_file() { tokio::fs::write(&memory_registry, "memory") .await .expect("write memory registry"); - tokio::fs::write(&legacy_consolidated, "legacy") - .await - .expect("write legacy consolidated"); wipe_consolidation_outputs(&root) .await .expect("wipe consolidation outputs"); assert!(!memory_registry.exists()); - assert!(!legacy_consolidated.exists()); assert!(!root.join("skills").exists()); } diff --git a/codex-rs/core/src/memories/types.rs b/codex-rs/core/src/memories/types.rs deleted file mode 100644 index 3b68a1c72..000000000 --- a/codex-rs/core/src/memories/types.rs +++ /dev/null @@ -1,15 +0,0 @@ -use serde::Deserialize; - -/// Parsed stage-1 model output payload. -#[derive(Debug, Clone, Deserialize)] -pub(super) struct StageOneOutput { - /// Detailed markdown raw memory for a single rollout. - #[serde(rename = "raw_memory", alias = "rawMemory", alias = "traceMemory")] - pub(super) raw_memory: String, - /// Optional rollout slug from the model output. Accepted but ignored. - #[serde(default)] - pub(super) rollout_slug: Option, - /// Compact summary line used for routing and indexing. - #[serde(rename = "rollout_summary", alias = "summary")] - pub(super) rollout_summary: String, -} diff --git a/codex-rs/core/templates/memories/consolidation.md b/codex-rs/core/templates/memories/consolidation.md index 20cdbea44..61db72229 100644 --- a/codex-rs/core/templates/memories/consolidation.md +++ b/codex-rs/core/templates/memories/consolidation.md @@ -19,7 +19,7 @@ Consolidation goals: 4. Deduplicate aggressively and remove generic advice. Expected outputs for this directory (create/update as needed): -- `MEMORY.md`: merged durable memory registry for this CWD. +- `MEMORY.md`: merged durable memory registry for this shared memory root. - `skills//...`: optional skill folders when there is clear reusable procedure value. Do not rewrite phase-1 artifacts except when adding explicit cross-references: diff --git a/codex-rs/core/templates/memories/stage_one_input.md b/codex-rs/core/templates/memories/stage_one_input.md index 259a1a759..8acbffacf 100644 --- a/codex-rs/core/templates/memories/stage_one_input.md +++ b/codex-rs/core/templates/memories/stage_one_input.md @@ -1,4 +1,4 @@ -Analyze this rollout and produce `raw_memory`, `rollout_summary`, and optional `rollout_slug` as JSON. +Analyze this rollout and produce `raw_memory` and `rollout_summary` as JSON. rollout_context: - rollout_path: {{ rollout_path }} diff --git a/codex-rs/core/templates/memories/stage_one_system.md b/codex-rs/core/templates/memories/stage_one_system.md index 054cda9f9..847a0feea 100644 --- a/codex-rs/core/templates/memories/stage_one_system.md +++ b/codex-rs/core/templates/memories/stage_one_system.md @@ -4,7 +4,6 @@ You are given one rollout and must produce exactly one JSON object. Return exactly one JSON object with this schema: - raw_memory: a detailed markdown raw memory for this rollout only. - rollout_summary: a concise summary suitable for shared memory aggregation. -- rollout_slug: optional stable slug for the rollout (accepted but currently ignored). Input contract: - The user message contains: diff --git a/codex-rs/state/migrations/0011_generic_jobs_and_stage1_outputs.sql b/codex-rs/state/migrations/0006_memories.sql similarity index 61% rename from codex-rs/state/migrations/0011_generic_jobs_and_stage1_outputs.sql rename to codex-rs/state/migrations/0006_memories.sql index 9095c8b8e..e5e34307f 100644 --- a/codex-rs/state/migrations/0011_generic_jobs_and_stage1_outputs.sql +++ b/codex-rs/state/migrations/0006_memories.sql @@ -1,22 +1,16 @@ -DROP TABLE IF EXISTS thread_memory; -DROP TABLE IF EXISTS memory_phase1_jobs; -DROP TABLE IF EXISTS memory_scope_dirty; -DROP TABLE IF EXISTS memory_phase2_jobs; -DROP TABLE IF EXISTS memory_consolidation_locks; - -CREATE TABLE IF NOT EXISTS stage1_outputs ( +CREATE TABLE stage1_outputs ( thread_id TEXT PRIMARY KEY, source_updated_at INTEGER NOT NULL, raw_memory TEXT NOT NULL, - summary TEXT NOT NULL, + rollout_summary TEXT NOT NULL, generated_at INTEGER NOT NULL, FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE ); -CREATE INDEX IF NOT EXISTS idx_stage1_outputs_source_updated_at +CREATE INDEX idx_stage1_outputs_source_updated_at ON stage1_outputs(source_updated_at DESC, thread_id DESC); -CREATE TABLE IF NOT EXISTS jobs ( +CREATE TABLE jobs ( kind TEXT NOT NULL, job_key TEXT NOT NULL, status TEXT NOT NULL, @@ -33,5 +27,5 @@ CREATE TABLE IF NOT EXISTS jobs ( PRIMARY KEY (kind, job_key) ); -CREATE INDEX IF NOT EXISTS idx_jobs_kind_status_retry_lease +CREATE INDEX idx_jobs_kind_status_retry_lease ON jobs(kind, status, retry_at, lease_until); diff --git a/codex-rs/state/migrations/0006_thread_memory.sql b/codex-rs/state/migrations/0006_thread_memory.sql deleted file mode 100644 index fe90ab667..000000000 --- a/codex-rs/state/migrations/0006_thread_memory.sql +++ /dev/null @@ -1,9 +0,0 @@ -CREATE TABLE thread_memory ( - thread_id TEXT PRIMARY KEY, - trace_summary TEXT NOT NULL, - memory_summary TEXT NOT NULL, - updated_at INTEGER NOT NULL, - FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE -); - -CREATE INDEX idx_thread_memory_updated_at ON thread_memory(updated_at DESC, thread_id DESC); diff --git a/codex-rs/state/migrations/0009_memory_consolidation_locks.sql b/codex-rs/state/migrations/0009_memory_consolidation_locks.sql deleted file mode 100644 index 793ab4dc3..000000000 --- a/codex-rs/state/migrations/0009_memory_consolidation_locks.sql +++ /dev/null @@ -1,8 +0,0 @@ -CREATE TABLE memory_consolidation_locks ( - cwd TEXT PRIMARY KEY, - working_thread_id TEXT NOT NULL, - updated_at INTEGER NOT NULL -); - -CREATE INDEX idx_memory_consolidation_locks_updated_at - ON memory_consolidation_locks(updated_at DESC); diff --git a/codex-rs/state/migrations/0010_memory_workflow_v2.sql b/codex-rs/state/migrations/0010_memory_workflow_v2.sql deleted file mode 100644 index 475b5f6e3..000000000 --- a/codex-rs/state/migrations/0010_memory_workflow_v2.sql +++ /dev/null @@ -1,85 +0,0 @@ -DROP TABLE IF EXISTS thread_memory; -DROP TABLE IF EXISTS memory_consolidation_locks; -DROP TABLE IF EXISTS memory_phase1_jobs; -DROP TABLE IF EXISTS memory_scope_dirty; -DROP TABLE IF EXISTS memory_phase2_jobs; - -CREATE TABLE thread_memory ( - thread_id TEXT NOT NULL, - scope_kind TEXT NOT NULL, - scope_key TEXT NOT NULL, - raw_memory TEXT NOT NULL, - memory_summary TEXT NOT NULL, - updated_at INTEGER NOT NULL, - last_used_at INTEGER, - used_count INTEGER NOT NULL DEFAULT 0, - invalidated_at INTEGER, - invalid_reason TEXT, - PRIMARY KEY (thread_id, scope_kind, scope_key), - FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE -); - -CREATE INDEX idx_thread_memory_scope_last_used_at - ON thread_memory(scope_kind, scope_key, last_used_at DESC, thread_id DESC); -CREATE INDEX idx_thread_memory_scope_updated_at - ON thread_memory(scope_kind, scope_key, updated_at DESC, thread_id DESC); - -CREATE TABLE memory_phase1_jobs ( - thread_id TEXT NOT NULL, - scope_kind TEXT NOT NULL, - scope_key TEXT NOT NULL, - status TEXT NOT NULL, - owner_session_id TEXT, - started_at INTEGER, - finished_at INTEGER, - failure_reason TEXT, - source_updated_at INTEGER NOT NULL, - raw_memory_path TEXT, - summary_hash TEXT, - ownership_token TEXT, - PRIMARY KEY (thread_id, scope_kind, scope_key), - FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE -); - -CREATE INDEX idx_memory_phase1_jobs_status_started_at - ON memory_phase1_jobs(status, started_at DESC); -CREATE INDEX idx_memory_phase1_jobs_scope - ON memory_phase1_jobs(scope_kind, scope_key); - -CREATE TABLE memory_scope_dirty ( - scope_kind TEXT NOT NULL, - scope_key TEXT NOT NULL, - dirty INTEGER NOT NULL, - updated_at INTEGER NOT NULL, - PRIMARY KEY (scope_kind, scope_key) -); - -CREATE INDEX idx_memory_scope_dirty_dirty - ON memory_scope_dirty(dirty, updated_at DESC); - -CREATE TABLE memory_phase2_jobs ( - scope_kind TEXT NOT NULL, - scope_key TEXT NOT NULL, - status TEXT NOT NULL, - owner_session_id TEXT, - agent_thread_id TEXT, - started_at INTEGER, - last_heartbeat_at INTEGER, - finished_at INTEGER, - attempt INTEGER NOT NULL DEFAULT 0, - failure_reason TEXT, - ownership_token TEXT, - PRIMARY KEY (scope_kind, scope_key) -); - -CREATE INDEX idx_memory_phase2_jobs_status_heartbeat - ON memory_phase2_jobs(status, last_heartbeat_at DESC); - -CREATE TABLE memory_consolidation_locks ( - cwd TEXT PRIMARY KEY, - working_thread_id TEXT NOT NULL, - updated_at INTEGER NOT NULL -); - -CREATE INDEX idx_memory_consolidation_locks_updated_at - ON memory_consolidation_locks(updated_at DESC); diff --git a/codex-rs/state/src/lib.rs b/codex-rs/state/src/lib.rs index 5564c3607..eb405a849 100644 --- a/codex-rs/state/src/lib.rs +++ b/codex-rs/state/src/lib.rs @@ -14,6 +14,7 @@ mod runtime; pub use model::LogEntry; pub use model::LogQuery; pub use model::LogRow; +pub use model::Phase2JobClaimOutcome; /// Preferred entrypoint: owns configuration and metrics. pub use runtime::StateRuntime; @@ -27,16 +28,15 @@ pub use model::BackfillStats; pub use model::BackfillStatus; pub use model::ExtractionOutcome; pub use model::SortKey; +pub use model::Stage1JobClaim; +pub use model::Stage1JobClaimOutcome; pub use model::Stage1Output; +pub use model::Stage1StartupClaimParams; pub use model::ThreadMetadata; pub use model::ThreadMetadataBuilder; pub use model::ThreadsPage; -pub use runtime::Phase2JobClaimOutcome; pub use runtime::STATE_DB_FILENAME; pub use runtime::STATE_DB_VERSION; -pub use runtime::Stage1JobClaim; -pub use runtime::Stage1JobClaimOutcome; -pub use runtime::Stage1StartupClaimParams; pub use runtime::state_db_filename; pub use runtime::state_db_path; diff --git a/codex-rs/state/src/model/memories.rs b/codex-rs/state/src/model/memories.rs new file mode 100644 index 000000000..81a7354c5 --- /dev/null +++ b/codex-rs/state/src/model/memories.rs @@ -0,0 +1,105 @@ +use anyhow::Result; +use chrono::DateTime; +use chrono::Utc; +use codex_protocol::ThreadId; +use sqlx::Row; +use sqlx::sqlite::SqliteRow; + +use super::ThreadMetadata; + +/// Stored stage-1 memory extraction output for a single thread. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Stage1Output { + pub thread_id: ThreadId, + pub source_updated_at: DateTime, + pub raw_memory: String, + pub rollout_summary: String, + pub generated_at: DateTime, +} + +#[derive(Debug)] +pub(crate) struct Stage1OutputRow { + thread_id: String, + source_updated_at: i64, + raw_memory: String, + rollout_summary: String, + generated_at: i64, +} + +impl Stage1OutputRow { + pub(crate) fn try_from_row(row: &SqliteRow) -> Result { + Ok(Self { + thread_id: row.try_get("thread_id")?, + source_updated_at: row.try_get("source_updated_at")?, + raw_memory: row.try_get("raw_memory")?, + rollout_summary: row.try_get("rollout_summary")?, + generated_at: row.try_get("generated_at")?, + }) + } +} + +impl TryFrom for Stage1Output { + type Error = anyhow::Error; + + fn try_from(row: Stage1OutputRow) -> std::result::Result { + Ok(Self { + thread_id: ThreadId::try_from(row.thread_id)?, + source_updated_at: epoch_seconds_to_datetime(row.source_updated_at)?, + raw_memory: row.raw_memory, + rollout_summary: row.rollout_summary, + generated_at: epoch_seconds_to_datetime(row.generated_at)?, + }) + } +} + +fn epoch_seconds_to_datetime(secs: i64) -> Result> { + DateTime::::from_timestamp(secs, 0) + .ok_or_else(|| anyhow::anyhow!("invalid unix timestamp: {secs}")) +} + +/// Result of trying to claim a stage-1 memory extraction job. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Stage1JobClaimOutcome { + /// The caller owns the job and should continue with extraction. + Claimed { ownership_token: String }, + /// Existing output is already newer than or equal to the source rollout. + SkippedUpToDate, + /// Another worker currently owns a fresh lease for this job. + SkippedRunning, + /// The job is in backoff and should not be retried yet. + SkippedRetryBackoff, + /// The job has exhausted retries and should not be retried automatically. + SkippedRetryExhausted, +} + +/// Claimed stage-1 job with thread metadata. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Stage1JobClaim { + pub thread: ThreadMetadata, + pub ownership_token: String, +} + +#[derive(Debug, Clone, Copy)] +pub struct Stage1StartupClaimParams<'a> { + pub scan_limit: usize, + pub max_claimed: usize, + pub max_age_days: i64, + pub min_rollout_idle_hours: i64, + pub allowed_sources: &'a [String], + pub lease_seconds: i64, +} + +/// Result of trying to claim a phase-2 consolidation job. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Phase2JobClaimOutcome { + /// The caller owns the global lock and should spawn consolidation. + Claimed { + ownership_token: String, + /// Snapshot of `input_watermark` at claim time. + input_watermark: i64, + }, + /// The global job is not pending consolidation (or is already up to date). + SkippedNotDirty, + /// Another worker currently owns a fresh global consolidation lease. + SkippedRunning, +} diff --git a/codex-rs/state/src/model/mod.rs b/codex-rs/state/src/model/mod.rs index 57fc80596..2a0009d64 100644 --- a/codex-rs/state/src/model/mod.rs +++ b/codex-rs/state/src/model/mod.rs @@ -1,6 +1,6 @@ mod backfill_state; mod log; -mod stage1_output; +mod memories; mod thread_metadata; pub use backfill_state::BackfillState; @@ -8,7 +8,11 @@ pub use backfill_state::BackfillStatus; pub use log::LogEntry; pub use log::LogQuery; pub use log::LogRow; -pub use stage1_output::Stage1Output; +pub use memories::Phase2JobClaimOutcome; +pub use memories::Stage1JobClaim; +pub use memories::Stage1JobClaimOutcome; +pub use memories::Stage1Output; +pub use memories::Stage1StartupClaimParams; pub use thread_metadata::Anchor; pub use thread_metadata::BackfillStats; pub use thread_metadata::ExtractionOutcome; @@ -17,7 +21,7 @@ pub use thread_metadata::ThreadMetadata; pub use thread_metadata::ThreadMetadataBuilder; pub use thread_metadata::ThreadsPage; -pub(crate) use stage1_output::Stage1OutputRow; +pub(crate) use memories::Stage1OutputRow; pub(crate) use thread_metadata::ThreadRow; pub(crate) use thread_metadata::anchor_from_item; pub(crate) use thread_metadata::datetime_to_epoch_seconds; diff --git a/codex-rs/state/src/model/stage1_output.rs b/codex-rs/state/src/model/stage1_output.rs deleted file mode 100644 index e69c6db62..000000000 --- a/codex-rs/state/src/model/stage1_output.rs +++ /dev/null @@ -1,56 +0,0 @@ -use anyhow::Result; -use chrono::DateTime; -use chrono::Utc; -use codex_protocol::ThreadId; -use sqlx::Row; -use sqlx::sqlite::SqliteRow; - -/// Stored stage-1 memory extraction output for a single thread. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Stage1Output { - pub thread_id: ThreadId, - pub source_updated_at: DateTime, - pub raw_memory: String, - pub summary: String, - pub generated_at: DateTime, -} - -#[derive(Debug)] -pub(crate) struct Stage1OutputRow { - thread_id: String, - source_updated_at: i64, - raw_memory: String, - summary: String, - generated_at: i64, -} - -impl Stage1OutputRow { - pub(crate) fn try_from_row(row: &SqliteRow) -> Result { - Ok(Self { - thread_id: row.try_get("thread_id")?, - source_updated_at: row.try_get("source_updated_at")?, - raw_memory: row.try_get("raw_memory")?, - summary: row.try_get("summary")?, - generated_at: row.try_get("generated_at")?, - }) - } -} - -impl TryFrom for Stage1Output { - type Error = anyhow::Error; - - fn try_from(row: Stage1OutputRow) -> std::result::Result { - Ok(Self { - thread_id: ThreadId::try_from(row.thread_id)?, - source_updated_at: epoch_seconds_to_datetime(row.source_updated_at)?, - raw_memory: row.raw_memory, - summary: row.summary, - generated_at: epoch_seconds_to_datetime(row.generated_at)?, - }) - } -} - -fn epoch_seconds_to_datetime(secs: i64) -> Result> { - DateTime::::from_timestamp(secs, 0) - .ok_or_else(|| anyhow::anyhow!("invalid unix timestamp: {secs}")) -} diff --git a/codex-rs/state/src/runtime.rs b/codex-rs/state/src/runtime.rs index 21f55d068..1a7b5f04a 100644 --- a/codex-rs/state/src/runtime.rs +++ b/codex-rs/state/src/runtime.rs @@ -41,8 +41,8 @@ pub const STATE_DB_VERSION: u32 = 4; const METRIC_DB_INIT: &str = "codex.db.init"; -mod memory; -// Memory-specific CRUD and phase job lifecycle methods live in `runtime/memory.rs`. +mod memories; +// Memory-specific CRUD and phase job lifecycle methods live in `runtime/memories.rs`. #[derive(Clone)] pub struct StateRuntime { @@ -51,53 +51,6 @@ pub struct StateRuntime { pool: Arc, } -/// Result of trying to claim a stage-1 memory extraction job. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Stage1JobClaimOutcome { - /// The caller owns the job and should continue with extraction. - Claimed { ownership_token: String }, - /// Existing output is already newer than or equal to the source rollout. - SkippedUpToDate, - /// Another worker currently owns a fresh lease for this job. - SkippedRunning, - /// The job is in backoff and should not be retried yet. - SkippedRetryBackoff, - /// The job has exhausted retries and should not be retried automatically. - SkippedRetryExhausted, -} - -/// Claimed stage-1 job with thread metadata. -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct Stage1JobClaim { - pub thread: ThreadMetadata, - pub ownership_token: String, -} - -#[derive(Debug, Clone, Copy)] -pub struct Stage1StartupClaimParams<'a> { - pub scan_limit: usize, - pub max_claimed: usize, - pub max_age_days: i64, - pub min_rollout_idle_hours: i64, - pub allowed_sources: &'a [String], - pub lease_seconds: i64, -} - -/// Result of trying to claim a phase-2 consolidation job. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Phase2JobClaimOutcome { - /// The caller owns the global lock and should spawn consolidation. - Claimed { - ownership_token: String, - /// Snapshot of `input_watermark` at claim time. - input_watermark: i64, - }, - /// The global job is not pending consolidation (or is already up to date). - SkippedNotDirty, - /// Another worker currently owns a fresh global consolidation lease. - SkippedRunning, -} - impl StateRuntime { /// Initialize the state runtime using the provided Codex home and default provider. /// @@ -914,14 +867,14 @@ fn push_thread_order_and_limit( #[cfg(test)] mod tests { - use super::Phase2JobClaimOutcome; use super::STATE_DB_FILENAME; use super::STATE_DB_VERSION; - use super::Stage1JobClaimOutcome; - use super::Stage1StartupClaimParams; use super::StateRuntime; use super::ThreadMetadata; use super::state_db_filename; + use crate::model::Phase2JobClaimOutcome; + use crate::model::Stage1JobClaimOutcome; + use crate::model::Stage1StartupClaimParams; use chrono::DateTime; use chrono::Duration; use chrono::Utc; @@ -1582,9 +1535,9 @@ WHERE kind = 'memory_stage1' .expect("list stage1 outputs for global"); assert_eq!(outputs.len(), 2); assert_eq!(outputs[0].thread_id, thread_id_b); - assert_eq!(outputs[0].summary, "summary b"); + assert_eq!(outputs[0].rollout_summary, "summary b"); assert_eq!(outputs[1].thread_id, thread_id_a); - assert_eq!(outputs[1].summary, "summary a"); + assert_eq!(outputs[1].rollout_summary, "summary a"); let _ = tokio::fs::remove_dir_all(codex_home).await; } diff --git a/codex-rs/state/src/runtime/memory.rs b/codex-rs/state/src/runtime/memories.rs similarity index 97% rename from codex-rs/state/src/runtime/memory.rs rename to codex-rs/state/src/runtime/memories.rs index 65ce9b08f..fdca79daf 100644 --- a/codex-rs/state/src/runtime/memory.rs +++ b/codex-rs/state/src/runtime/memories.rs @@ -1,6 +1,10 @@ use super::*; -use crate::Stage1Output; +use crate::model::Phase2JobClaimOutcome; +use crate::model::Stage1JobClaim; +use crate::model::Stage1JobClaimOutcome; +use crate::model::Stage1Output; use crate::model::Stage1OutputRow; +use crate::model::Stage1StartupClaimParams; use crate::model::ThreadRow; use chrono::Duration; use sqlx::Executor; @@ -117,7 +121,7 @@ FROM threads ) -> anyhow::Result> { let row = sqlx::query( r#" -SELECT thread_id, source_updated_at, raw_memory, summary, generated_at +SELECT thread_id, source_updated_at, raw_memory, rollout_summary, generated_at FROM stage1_outputs WHERE thread_id = ? "#, @@ -140,9 +144,8 @@ WHERE thread_id = ? let rows = sqlx::query( r#" -SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.summary, so.generated_at +SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.rollout_summary, so.generated_at FROM stage1_outputs AS so -JOIN threads AS t ON t.id = so.thread_id ORDER BY so.source_updated_at DESC, so.thread_id DESC LIMIT ? "#, @@ -329,7 +332,7 @@ WHERE kind = ? AND job_key = ? ownership_token: &str, source_updated_at: i64, raw_memory: &str, - summary: &str, + rollout_summary: &str, ) -> anyhow::Result { let now = Utc::now().timestamp(); let thread_id = thread_id.to_string(); @@ -367,13 +370,13 @@ INSERT INTO stage1_outputs ( thread_id, source_updated_at, raw_memory, - summary, + rollout_summary, generated_at ) VALUES (?, ?, ?, ?, ?) ON CONFLICT(thread_id) DO UPDATE SET source_updated_at = excluded.source_updated_at, raw_memory = excluded.raw_memory, - summary = excluded.summary, + rollout_summary = excluded.rollout_summary, generated_at = excluded.generated_at WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at "#, @@ -381,7 +384,7 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at .bind(thread_id.as_str()) .bind(source_updated_at) .bind(raw_memory) - .bind(summary) + .bind(rollout_summary) .bind(now) .execute(&mut *tx) .await?;