feat: mem v2 - PR5 (#11372)

This commit is contained in:
jif-oai 2026-02-10 23:22:55 +00:00 committed by GitHub
parent 34fb4b6e63
commit 2c9be54c9a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 208 additions and 433 deletions

View file

@ -1,83 +0,0 @@
use std::path::Path;
use std::path::PathBuf;
pub(super) const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries";
pub(super) const RAW_MEMORIES_FILENAME: &str = "raw_memories.md";
pub(super) const MEMORY_REGISTRY_FILENAME: &str = "MEMORY.md";
pub(super) const LEGACY_CONSOLIDATED_FILENAME: &str = "consolidated.md";
pub(super) const SKILLS_SUBDIR: &str = "skills";
const LEGACY_USER_SUBDIR: &str = "user";
const LEGACY_MEMORY_SUBDIR: &str = "memory";
/// Returns the shared on-disk memory root directory.
pub(super) fn memory_root(codex_home: &Path) -> PathBuf {
codex_home.join("memories")
}
pub(super) fn rollout_summaries_dir(root: &Path) -> PathBuf {
root.join(ROLLOUT_SUMMARIES_SUBDIR)
}
pub(super) fn raw_memories_file(root: &Path) -> PathBuf {
root.join(RAW_MEMORIES_FILENAME)
}
/// Migrates legacy user memory contents into the shared root when no shared-root
/// phase artifacts exist yet.
pub(super) async fn migrate_legacy_user_memory_root_if_needed(
codex_home: &Path,
) -> std::io::Result<()> {
let root = memory_root(codex_home);
let legacy = legacy_user_memory_root(codex_home);
if !tokio::fs::try_exists(&legacy).await? || global_root_has_phase_artifacts(&root).await? {
return Ok(());
}
copy_dir_contents_if_missing(&legacy, &root).await
}
/// Ensures the phase-1 memory directory layout exists for the given root.
pub(super) async fn ensure_layout(root: &Path) -> std::io::Result<()> {
tokio::fs::create_dir_all(rollout_summaries_dir(root)).await
}
fn legacy_user_memory_root(codex_home: &Path) -> PathBuf {
codex_home
.join("memories")
.join(LEGACY_USER_SUBDIR)
.join(LEGACY_MEMORY_SUBDIR)
}
async fn global_root_has_phase_artifacts(root: &Path) -> std::io::Result<bool> {
if tokio::fs::try_exists(&rollout_summaries_dir(root)).await?
|| tokio::fs::try_exists(&raw_memories_file(root)).await?
|| tokio::fs::try_exists(&root.join(MEMORY_REGISTRY_FILENAME)).await?
|| tokio::fs::try_exists(&root.join(LEGACY_CONSOLIDATED_FILENAME)).await?
|| tokio::fs::try_exists(&root.join(SKILLS_SUBDIR)).await?
{
return Ok(true);
}
Ok(false)
}
fn copy_dir_contents_if_missing<'a>(
src_dir: &'a Path,
dst_dir: &'a Path,
) -> futures::future::BoxFuture<'a, std::io::Result<()>> {
Box::pin(async move {
tokio::fs::create_dir_all(dst_dir).await?;
let mut dir = tokio::fs::read_dir(src_dir).await?;
while let Some(entry) = dir.next_entry().await? {
let src_path = entry.path();
let dst_path = dst_dir.join(entry.file_name());
let metadata = entry.metadata().await?;
if metadata.is_dir() {
copy_dir_contents_if_missing(&src_path, &dst_path).await?;
} else if metadata.is_file() && !tokio::fs::try_exists(&dst_path).await? {
tokio::fs::copy(&src_path, &dst_path).await?;
}
}
Ok(())
})
}

View file

@ -4,20 +4,26 @@
//! - Phase 1: select rollouts, extract stage-1 raw memories, persist stage-1 outputs, and enqueue consolidation.
//! - Phase 2: claim a global consolidation lock, materialize consolidation inputs, and dispatch one consolidation agent.
mod layout;
mod prompts;
mod rollout;
mod stage_one;
mod startup;
mod storage;
mod text;
mod types;
#[cfg(test)]
mod tests;
use serde::Deserialize;
use std::path::Path;
use std::path::PathBuf;
/// Subagent source label used to identify consolidation tasks.
const MEMORY_CONSOLIDATION_SUBAGENT_LABEL: &str = "memory_consolidation";
const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries";
const RAW_MEMORIES_FILENAME: &str = "raw_memories.md";
const MEMORY_REGISTRY_FILENAME: &str = "MEMORY.md";
const SKILLS_SUBDIR: &str = "skills";
/// Maximum number of rollout candidates processed per startup pass.
const MAX_ROLLOUTS_PER_STARTUP: usize = 64;
/// Concurrency cap for startup memory extraction and consolidation scheduling.
@ -39,6 +45,34 @@ const PHASE_TWO_JOB_RETRY_DELAY_SECONDS: i64 = 3_600;
/// Heartbeat interval (seconds) for phase-2 running jobs.
const PHASE_TWO_JOB_HEARTBEAT_SECONDS: u64 = 30;
/// Parsed stage-1 model output payload.
#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
struct StageOneOutput {
/// Detailed markdown raw memory for a single rollout.
#[serde(rename = "raw_memory")]
raw_memory: String,
/// Compact summary line used for routing and indexing.
#[serde(rename = "rollout_summary")]
rollout_summary: String,
}
fn memory_root(codex_home: &Path) -> PathBuf {
codex_home.join("memories")
}
fn rollout_summaries_dir(root: &Path) -> PathBuf {
root.join(ROLLOUT_SUMMARIES_SUBDIR)
}
fn raw_memories_file(root: &Path) -> PathBuf {
root.join(RAW_MEMORIES_FILENAME)
}
async fn ensure_layout(root: &Path) -> std::io::Result<()> {
tokio::fs::create_dir_all(rollout_summaries_dir(root)).await
}
/// Starts the memory startup pipeline for eligible root sessions.
///
/// This is the single entrypoint that `codex` uses to trigger memory startup.

View file

@ -5,9 +5,9 @@ use regex::Regex;
use serde_json::Value;
use serde_json::json;
use super::StageOneOutput;
use super::text::compact_whitespace;
use super::text::truncate_text_for_storage;
use super::types::StageOneOutput;
/// System prompt for stage-1 raw memory extraction.
pub(super) const RAW_MEMORY_PROMPT: &str =
@ -28,7 +28,6 @@ pub(super) fn stage_one_output_schema() -> Value {
json!({
"type": "object",
"properties": {
"rollout_slug": { "type": "string" },
"rollout_summary": { "type": "string" },
"raw_memory": { "type": "string" }
},
@ -97,12 +96,6 @@ fn parse_json_object_loose(raw: &str) -> Result<Value> {
fn normalize_stage_one_output(mut output: StageOneOutput) -> Result<StageOneOutput> {
output.raw_memory = output.raw_memory.trim().to_string();
output.rollout_summary = output.rollout_summary.trim().to_string();
if let Some(slug) = output.rollout_slug.take() {
let slug = slug.trim();
if !slug.is_empty() {
output.rollout_slug = Some(slug.to_string());
}
}
if output.raw_memory.is_empty() {
return Err(CodexErr::InvalidRequest(
@ -195,7 +188,6 @@ mod tests {
fn normalize_stage_one_output_redacts_and_compacts_summary() {
let output = StageOneOutput {
raw_memory: "Token: sk-abcdefghijklmnopqrstuvwxyz123456\nBearer abcdefghijklmnopqrstuvwxyz012345".to_string(),
rollout_slug: None,
rollout_summary: "password = mysecret123456\n\nsmall".to_string(),
};

View file

@ -1,6 +1,6 @@
use crate::codex::Session;
use crate::config::Config;
use crate::memories::layout::memory_root;
use crate::memories::memory_root;
use codex_protocol::protocol::SessionSource;
use codex_protocol::protocol::SubAgentSource;
use codex_protocol::user_input::UserInput;
@ -17,7 +17,7 @@ use super::super::prompts::build_consolidation_prompt;
use super::super::storage::rebuild_raw_memories_file_from_memories;
use super::super::storage::sync_rollout_summaries_from_memories;
use super::super::storage::wipe_consolidation_outputs;
use super::watch::spawn_phase2_completion_task;
use super::phase2::spawn_phase2_completion_task;
pub(super) async fn run_global_memory_consolidation(
session: &Arc<Session>,

View file

@ -12,13 +12,13 @@ use futures::StreamExt;
use tracing::warn;
use super::StageOneRequestContext;
use crate::memories::StageOneOutput;
use crate::memories::prompts::build_stage_one_input_message;
use crate::memories::rollout::StageOneRolloutFilter;
use crate::memories::rollout::serialize_filtered_rollout_response_items;
use crate::memories::stage_one::RAW_MEMORY_PROMPT;
use crate::memories::stage_one::parse_stage_one_output;
use crate::memories::stage_one::stage_one_output_schema;
use crate::memories::types::StageOneOutput;
use std::path::Path;
pub(super) async fn extract_stage_one_output(

View file

@ -1,13 +1,12 @@
mod dispatch;
mod extract;
mod watch;
mod phase2;
use crate::codex::Session;
use crate::codex::TurnContext;
use crate::config::Config;
use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::memories::layout::migrate_legacy_user_memory_root_if_needed;
use crate::rollout::INTERACTIVE_SESSION_SOURCES;
use codex_otel::OtelManager;
use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig;
@ -15,7 +14,6 @@ use codex_protocol::openai_models::ModelInfo;
use codex_protocol::openai_models::ReasoningEffort as ReasoningEffortConfig;
use codex_protocol::protocol::SessionSource;
use futures::StreamExt;
use serde_json::Value;
use std::sync::Arc;
use tracing::info;
use tracing::warn;
@ -80,10 +78,6 @@ pub(super) async fn run_memories_startup_pipeline(
session: &Arc<Session>,
config: Arc<Config>,
) -> CodexResult<()> {
if let Err(err) = migrate_legacy_user_memory_root_if_needed(&config.codex_home).await {
warn!("failed migrating legacy shared memory root: {err}");
}
let Some(state_db) = session.services.state_db.as_deref() else {
warn!("state db unavailable for memories startup pipeline; skipping");
return Ok(());
@ -91,11 +85,7 @@ pub(super) async fn run_memories_startup_pipeline(
let allowed_sources = INTERACTIVE_SESSION_SOURCES
.iter()
.map(|value| match serde_json::to_value(value) {
Ok(Value::String(s)) => s,
Ok(other) => other.to_string(),
Err(_) => String::new(),
})
.map(ToString::to_string)
.collect::<Vec<_>>();
let claimed_candidates = match state_db
@ -186,7 +176,8 @@ pub(super) async fn run_memories_startup_pipeline(
claimed_count, succeeded_count
);
let consolidation_job_count = run_consolidation_dispatch(session, config).await;
let consolidation_job_count =
usize::from(dispatch::run_global_memory_consolidation(session, config).await);
info!(
"memory consolidation dispatch complete: {} job(s) scheduled",
consolidation_job_count
@ -194,7 +185,3 @@ pub(super) async fn run_memories_startup_pipeline(
Ok(())
}
async fn run_consolidation_dispatch(session: &Arc<Session>, config: Arc<Config>) -> usize {
usize::from(dispatch::run_global_memory_consolidation(session, config).await)
}

View file

@ -5,13 +5,12 @@ use std::path::Path;
use tracing::warn;
use super::MAX_RAW_MEMORIES_FOR_GLOBAL;
use super::MEMORY_REGISTRY_FILENAME;
use super::SKILLS_SUBDIR;
use super::ensure_layout;
use super::raw_memories_file;
use super::rollout_summaries_dir;
use super::text::compact_whitespace;
use crate::memories::layout::LEGACY_CONSOLIDATED_FILENAME;
use crate::memories::layout::MEMORY_REGISTRY_FILENAME;
use crate::memories::layout::SKILLS_SUBDIR;
use crate::memories::layout::ensure_layout;
use crate::memories::layout::raw_memories_file;
use crate::memories::layout::rollout_summaries_dir;
/// Rebuild `raw_memories.md` from DB-backed stage-1 outputs.
pub(super) async fn rebuild_raw_memories_file_from_memories(
@ -49,16 +48,14 @@ pub(super) async fn sync_rollout_summaries_from_memories(
///
/// Phase-1 artifacts (`rollout_summaries/` and `raw_memories.md`) are preserved.
pub(super) async fn wipe_consolidation_outputs(root: &Path) -> std::io::Result<()> {
for file_name in [MEMORY_REGISTRY_FILENAME, LEGACY_CONSOLIDATED_FILENAME] {
let path = root.join(file_name);
if let Err(err) = tokio::fs::remove_file(&path).await
&& err.kind() != std::io::ErrorKind::NotFound
{
warn!(
"failed removing consolidation file {}: {err}",
path.display()
);
}
let path = root.join(MEMORY_REGISTRY_FILENAME);
if let Err(err) = tokio::fs::remove_file(&path).await
&& err.kind() != std::io::ErrorKind::NotFound
{
warn!(
"failed removing consolidation file {}: {err}",
path.display()
);
}
let skills_dir = root.join(SKILLS_SUBDIR);
@ -152,7 +149,7 @@ async fn write_rollout_summary_for_thread(
.map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?;
writeln!(body)
.map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?;
body.push_str(&compact_whitespace(&memory.summary));
body.push_str(&compact_whitespace(&memory.rollout_summary));
body.push('\n');
tokio::fs::write(path, body).await

View file

@ -5,11 +5,10 @@ use super::stage_one::parse_stage_one_output;
use super::storage::rebuild_raw_memories_file_from_memories;
use super::storage::sync_rollout_summaries_from_memories;
use super::storage::wipe_consolidation_outputs;
use crate::memories::layout::ensure_layout;
use crate::memories::layout::memory_root;
use crate::memories::layout::migrate_legacy_user_memory_root_if_needed;
use crate::memories::layout::raw_memories_file;
use crate::memories::layout::rollout_summaries_dir;
use crate::memories::ensure_layout;
use crate::memories::memory_root;
use crate::memories::raw_memories_file;
use crate::memories::rollout_summaries_dir;
use chrono::TimeZone;
use chrono::Utc;
use codex_protocol::ThreadId;
@ -28,49 +27,18 @@ fn memory_root_uses_shared_global_path() {
assert_eq!(memory_root(&codex_home), codex_home.join("memories"));
}
#[tokio::test]
async fn migrate_legacy_user_memory_root_if_needed_copies_contents() {
let dir = tempdir().expect("tempdir");
let codex_home = dir.path().join("codex");
let legacy_root = codex_home.join("memories").join("user").join("memory");
tokio::fs::create_dir_all(legacy_root.join("rollout_summaries"))
.await
.expect("create legacy rollout summaries dir");
tokio::fs::write(
legacy_root.join("rollout_summaries").join("thread.md"),
"summary",
)
.await
.expect("write legacy rollout summary");
tokio::fs::write(legacy_root.join("raw_memories.md"), "raw")
.await
.expect("write legacy raw memories");
migrate_legacy_user_memory_root_if_needed(&codex_home)
.await
.expect("migrate legacy memory root");
let root = memory_root(&codex_home);
assert!(root.join("rollout_summaries").join("thread.md").is_file());
assert!(root.join("raw_memories.md").is_file());
}
#[test]
fn parse_stage_one_output_accepts_fenced_json() {
let raw = "```json\n{\"raw_memory\":\"abc\",\"rollout_summary\":\"short\",\"rollout_slug\":\"slug\"}\n```";
let raw = "```json\n{\"raw_memory\":\"abc\",\"rollout_summary\":\"short\"}\n```";
let parsed = parse_stage_one_output(raw).expect("parsed");
assert!(parsed.raw_memory.contains("abc"));
assert_eq!(parsed.rollout_summary, "short");
assert_eq!(parsed.rollout_slug, Some("slug".to_string()));
}
#[test]
fn parse_stage_one_output_accepts_legacy_keys() {
fn parse_stage_one_output_rejects_legacy_keys() {
let raw = r#"{"rawMemory":"abc","summary":"short"}"#;
let parsed = parse_stage_one_output(raw).expect("parsed");
assert!(parsed.raw_memory.contains("abc"));
assert_eq!(parsed.rollout_summary, "short");
assert_eq!(parsed.rollout_slug, None);
assert!(parse_stage_one_output(raw).is_err());
}
#[test]
@ -194,7 +162,7 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
thread_id: ThreadId::try_from(keep_id.clone()).expect("thread id"),
source_updated_at: Utc.timestamp_opt(100, 0).single().expect("timestamp"),
raw_memory: "raw memory".to_string(),
summary: "short summary".to_string(),
rollout_summary: "short summary".to_string(),
generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"),
}];
@ -216,13 +184,12 @@ async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only
}
#[tokio::test]
async fn wipe_consolidation_outputs_removes_registry_skills_and_legacy_file() {
async fn wipe_consolidation_outputs_removes_registry_and_skills() {
let dir = tempdir().expect("tempdir");
let root = dir.path().join("memory");
ensure_layout(&root).await.expect("ensure layout");
let memory_registry = root.join("MEMORY.md");
let legacy_consolidated = root.join("consolidated.md");
let skills_dir = root.join("skills").join("example");
tokio::fs::create_dir_all(&skills_dir)
@ -231,15 +198,11 @@ async fn wipe_consolidation_outputs_removes_registry_skills_and_legacy_file() {
tokio::fs::write(&memory_registry, "memory")
.await
.expect("write memory registry");
tokio::fs::write(&legacy_consolidated, "legacy")
.await
.expect("write legacy consolidated");
wipe_consolidation_outputs(&root)
.await
.expect("wipe consolidation outputs");
assert!(!memory_registry.exists());
assert!(!legacy_consolidated.exists());
assert!(!root.join("skills").exists());
}

View file

@ -1,15 +0,0 @@
use serde::Deserialize;
/// Parsed stage-1 model output payload.
#[derive(Debug, Clone, Deserialize)]
pub(super) struct StageOneOutput {
/// Detailed markdown raw memory for a single rollout.
#[serde(rename = "raw_memory", alias = "rawMemory", alias = "traceMemory")]
pub(super) raw_memory: String,
/// Optional rollout slug from the model output. Accepted but ignored.
#[serde(default)]
pub(super) rollout_slug: Option<String>,
/// Compact summary line used for routing and indexing.
#[serde(rename = "rollout_summary", alias = "summary")]
pub(super) rollout_summary: String,
}

View file

@ -19,7 +19,7 @@ Consolidation goals:
4. Deduplicate aggressively and remove generic advice.
Expected outputs for this directory (create/update as needed):
- `MEMORY.md`: merged durable memory registry for this CWD.
- `MEMORY.md`: merged durable memory registry for this shared memory root.
- `skills/<skill-name>/...`: optional skill folders when there is clear reusable procedure value.
Do not rewrite phase-1 artifacts except when adding explicit cross-references:

View file

@ -1,4 +1,4 @@
Analyze this rollout and produce `raw_memory`, `rollout_summary`, and optional `rollout_slug` as JSON.
Analyze this rollout and produce `raw_memory` and `rollout_summary` as JSON.
rollout_context:
- rollout_path: {{ rollout_path }}

View file

@ -4,7 +4,6 @@ You are given one rollout and must produce exactly one JSON object.
Return exactly one JSON object with this schema:
- raw_memory: a detailed markdown raw memory for this rollout only.
- rollout_summary: a concise summary suitable for shared memory aggregation.
- rollout_slug: optional stable slug for the rollout (accepted but currently ignored).
Input contract:
- The user message contains:

View file

@ -1,22 +1,16 @@
DROP TABLE IF EXISTS thread_memory;
DROP TABLE IF EXISTS memory_phase1_jobs;
DROP TABLE IF EXISTS memory_scope_dirty;
DROP TABLE IF EXISTS memory_phase2_jobs;
DROP TABLE IF EXISTS memory_consolidation_locks;
CREATE TABLE IF NOT EXISTS stage1_outputs (
CREATE TABLE stage1_outputs (
thread_id TEXT PRIMARY KEY,
source_updated_at INTEGER NOT NULL,
raw_memory TEXT NOT NULL,
summary TEXT NOT NULL,
rollout_summary TEXT NOT NULL,
generated_at INTEGER NOT NULL,
FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_stage1_outputs_source_updated_at
CREATE INDEX idx_stage1_outputs_source_updated_at
ON stage1_outputs(source_updated_at DESC, thread_id DESC);
CREATE TABLE IF NOT EXISTS jobs (
CREATE TABLE jobs (
kind TEXT NOT NULL,
job_key TEXT NOT NULL,
status TEXT NOT NULL,
@ -33,5 +27,5 @@ CREATE TABLE IF NOT EXISTS jobs (
PRIMARY KEY (kind, job_key)
);
CREATE INDEX IF NOT EXISTS idx_jobs_kind_status_retry_lease
CREATE INDEX idx_jobs_kind_status_retry_lease
ON jobs(kind, status, retry_at, lease_until);

View file

@ -1,9 +0,0 @@
CREATE TABLE thread_memory (
thread_id TEXT PRIMARY KEY,
trace_summary TEXT NOT NULL,
memory_summary TEXT NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
);
CREATE INDEX idx_thread_memory_updated_at ON thread_memory(updated_at DESC, thread_id DESC);

View file

@ -1,8 +0,0 @@
CREATE TABLE memory_consolidation_locks (
cwd TEXT PRIMARY KEY,
working_thread_id TEXT NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX idx_memory_consolidation_locks_updated_at
ON memory_consolidation_locks(updated_at DESC);

View file

@ -1,85 +0,0 @@
DROP TABLE IF EXISTS thread_memory;
DROP TABLE IF EXISTS memory_consolidation_locks;
DROP TABLE IF EXISTS memory_phase1_jobs;
DROP TABLE IF EXISTS memory_scope_dirty;
DROP TABLE IF EXISTS memory_phase2_jobs;
CREATE TABLE thread_memory (
thread_id TEXT NOT NULL,
scope_kind TEXT NOT NULL,
scope_key TEXT NOT NULL,
raw_memory TEXT NOT NULL,
memory_summary TEXT NOT NULL,
updated_at INTEGER NOT NULL,
last_used_at INTEGER,
used_count INTEGER NOT NULL DEFAULT 0,
invalidated_at INTEGER,
invalid_reason TEXT,
PRIMARY KEY (thread_id, scope_kind, scope_key),
FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
);
CREATE INDEX idx_thread_memory_scope_last_used_at
ON thread_memory(scope_kind, scope_key, last_used_at DESC, thread_id DESC);
CREATE INDEX idx_thread_memory_scope_updated_at
ON thread_memory(scope_kind, scope_key, updated_at DESC, thread_id DESC);
CREATE TABLE memory_phase1_jobs (
thread_id TEXT NOT NULL,
scope_kind TEXT NOT NULL,
scope_key TEXT NOT NULL,
status TEXT NOT NULL,
owner_session_id TEXT,
started_at INTEGER,
finished_at INTEGER,
failure_reason TEXT,
source_updated_at INTEGER NOT NULL,
raw_memory_path TEXT,
summary_hash TEXT,
ownership_token TEXT,
PRIMARY KEY (thread_id, scope_kind, scope_key),
FOREIGN KEY(thread_id) REFERENCES threads(id) ON DELETE CASCADE
);
CREATE INDEX idx_memory_phase1_jobs_status_started_at
ON memory_phase1_jobs(status, started_at DESC);
CREATE INDEX idx_memory_phase1_jobs_scope
ON memory_phase1_jobs(scope_kind, scope_key);
CREATE TABLE memory_scope_dirty (
scope_kind TEXT NOT NULL,
scope_key TEXT NOT NULL,
dirty INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
PRIMARY KEY (scope_kind, scope_key)
);
CREATE INDEX idx_memory_scope_dirty_dirty
ON memory_scope_dirty(dirty, updated_at DESC);
CREATE TABLE memory_phase2_jobs (
scope_kind TEXT NOT NULL,
scope_key TEXT NOT NULL,
status TEXT NOT NULL,
owner_session_id TEXT,
agent_thread_id TEXT,
started_at INTEGER,
last_heartbeat_at INTEGER,
finished_at INTEGER,
attempt INTEGER NOT NULL DEFAULT 0,
failure_reason TEXT,
ownership_token TEXT,
PRIMARY KEY (scope_kind, scope_key)
);
CREATE INDEX idx_memory_phase2_jobs_status_heartbeat
ON memory_phase2_jobs(status, last_heartbeat_at DESC);
CREATE TABLE memory_consolidation_locks (
cwd TEXT PRIMARY KEY,
working_thread_id TEXT NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX idx_memory_consolidation_locks_updated_at
ON memory_consolidation_locks(updated_at DESC);

View file

@ -14,6 +14,7 @@ mod runtime;
pub use model::LogEntry;
pub use model::LogQuery;
pub use model::LogRow;
pub use model::Phase2JobClaimOutcome;
/// Preferred entrypoint: owns configuration and metrics.
pub use runtime::StateRuntime;
@ -27,16 +28,15 @@ pub use model::BackfillStats;
pub use model::BackfillStatus;
pub use model::ExtractionOutcome;
pub use model::SortKey;
pub use model::Stage1JobClaim;
pub use model::Stage1JobClaimOutcome;
pub use model::Stage1Output;
pub use model::Stage1StartupClaimParams;
pub use model::ThreadMetadata;
pub use model::ThreadMetadataBuilder;
pub use model::ThreadsPage;
pub use runtime::Phase2JobClaimOutcome;
pub use runtime::STATE_DB_FILENAME;
pub use runtime::STATE_DB_VERSION;
pub use runtime::Stage1JobClaim;
pub use runtime::Stage1JobClaimOutcome;
pub use runtime::Stage1StartupClaimParams;
pub use runtime::state_db_filename;
pub use runtime::state_db_path;

View file

@ -0,0 +1,105 @@
use anyhow::Result;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
use sqlx::Row;
use sqlx::sqlite::SqliteRow;
use super::ThreadMetadata;
/// Stored stage-1 memory extraction output for a single thread.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Stage1Output {
pub thread_id: ThreadId,
pub source_updated_at: DateTime<Utc>,
pub raw_memory: String,
pub rollout_summary: String,
pub generated_at: DateTime<Utc>,
}
#[derive(Debug)]
pub(crate) struct Stage1OutputRow {
thread_id: String,
source_updated_at: i64,
raw_memory: String,
rollout_summary: String,
generated_at: i64,
}
impl Stage1OutputRow {
pub(crate) fn try_from_row(row: &SqliteRow) -> Result<Self> {
Ok(Self {
thread_id: row.try_get("thread_id")?,
source_updated_at: row.try_get("source_updated_at")?,
raw_memory: row.try_get("raw_memory")?,
rollout_summary: row.try_get("rollout_summary")?,
generated_at: row.try_get("generated_at")?,
})
}
}
impl TryFrom<Stage1OutputRow> for Stage1Output {
type Error = anyhow::Error;
fn try_from(row: Stage1OutputRow) -> std::result::Result<Self, Self::Error> {
Ok(Self {
thread_id: ThreadId::try_from(row.thread_id)?,
source_updated_at: epoch_seconds_to_datetime(row.source_updated_at)?,
raw_memory: row.raw_memory,
rollout_summary: row.rollout_summary,
generated_at: epoch_seconds_to_datetime(row.generated_at)?,
})
}
}
fn epoch_seconds_to_datetime(secs: i64) -> Result<DateTime<Utc>> {
DateTime::<Utc>::from_timestamp(secs, 0)
.ok_or_else(|| anyhow::anyhow!("invalid unix timestamp: {secs}"))
}
/// Result of trying to claim a stage-1 memory extraction job.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Stage1JobClaimOutcome {
/// The caller owns the job and should continue with extraction.
Claimed { ownership_token: String },
/// Existing output is already newer than or equal to the source rollout.
SkippedUpToDate,
/// Another worker currently owns a fresh lease for this job.
SkippedRunning,
/// The job is in backoff and should not be retried yet.
SkippedRetryBackoff,
/// The job has exhausted retries and should not be retried automatically.
SkippedRetryExhausted,
}
/// Claimed stage-1 job with thread metadata.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Stage1JobClaim {
pub thread: ThreadMetadata,
pub ownership_token: String,
}
#[derive(Debug, Clone, Copy)]
pub struct Stage1StartupClaimParams<'a> {
pub scan_limit: usize,
pub max_claimed: usize,
pub max_age_days: i64,
pub min_rollout_idle_hours: i64,
pub allowed_sources: &'a [String],
pub lease_seconds: i64,
}
/// Result of trying to claim a phase-2 consolidation job.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Phase2JobClaimOutcome {
/// The caller owns the global lock and should spawn consolidation.
Claimed {
ownership_token: String,
/// Snapshot of `input_watermark` at claim time.
input_watermark: i64,
},
/// The global job is not pending consolidation (or is already up to date).
SkippedNotDirty,
/// Another worker currently owns a fresh global consolidation lease.
SkippedRunning,
}

View file

@ -1,6 +1,6 @@
mod backfill_state;
mod log;
mod stage1_output;
mod memories;
mod thread_metadata;
pub use backfill_state::BackfillState;
@ -8,7 +8,11 @@ pub use backfill_state::BackfillStatus;
pub use log::LogEntry;
pub use log::LogQuery;
pub use log::LogRow;
pub use stage1_output::Stage1Output;
pub use memories::Phase2JobClaimOutcome;
pub use memories::Stage1JobClaim;
pub use memories::Stage1JobClaimOutcome;
pub use memories::Stage1Output;
pub use memories::Stage1StartupClaimParams;
pub use thread_metadata::Anchor;
pub use thread_metadata::BackfillStats;
pub use thread_metadata::ExtractionOutcome;
@ -17,7 +21,7 @@ pub use thread_metadata::ThreadMetadata;
pub use thread_metadata::ThreadMetadataBuilder;
pub use thread_metadata::ThreadsPage;
pub(crate) use stage1_output::Stage1OutputRow;
pub(crate) use memories::Stage1OutputRow;
pub(crate) use thread_metadata::ThreadRow;
pub(crate) use thread_metadata::anchor_from_item;
pub(crate) use thread_metadata::datetime_to_epoch_seconds;

View file

@ -1,56 +0,0 @@
use anyhow::Result;
use chrono::DateTime;
use chrono::Utc;
use codex_protocol::ThreadId;
use sqlx::Row;
use sqlx::sqlite::SqliteRow;
/// Stored stage-1 memory extraction output for a single thread.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Stage1Output {
pub thread_id: ThreadId,
pub source_updated_at: DateTime<Utc>,
pub raw_memory: String,
pub summary: String,
pub generated_at: DateTime<Utc>,
}
#[derive(Debug)]
pub(crate) struct Stage1OutputRow {
thread_id: String,
source_updated_at: i64,
raw_memory: String,
summary: String,
generated_at: i64,
}
impl Stage1OutputRow {
pub(crate) fn try_from_row(row: &SqliteRow) -> Result<Self> {
Ok(Self {
thread_id: row.try_get("thread_id")?,
source_updated_at: row.try_get("source_updated_at")?,
raw_memory: row.try_get("raw_memory")?,
summary: row.try_get("summary")?,
generated_at: row.try_get("generated_at")?,
})
}
}
impl TryFrom<Stage1OutputRow> for Stage1Output {
type Error = anyhow::Error;
fn try_from(row: Stage1OutputRow) -> std::result::Result<Self, Self::Error> {
Ok(Self {
thread_id: ThreadId::try_from(row.thread_id)?,
source_updated_at: epoch_seconds_to_datetime(row.source_updated_at)?,
raw_memory: row.raw_memory,
summary: row.summary,
generated_at: epoch_seconds_to_datetime(row.generated_at)?,
})
}
}
fn epoch_seconds_to_datetime(secs: i64) -> Result<DateTime<Utc>> {
DateTime::<Utc>::from_timestamp(secs, 0)
.ok_or_else(|| anyhow::anyhow!("invalid unix timestamp: {secs}"))
}

View file

@ -41,8 +41,8 @@ pub const STATE_DB_VERSION: u32 = 4;
const METRIC_DB_INIT: &str = "codex.db.init";
mod memory;
// Memory-specific CRUD and phase job lifecycle methods live in `runtime/memory.rs`.
mod memories;
// Memory-specific CRUD and phase job lifecycle methods live in `runtime/memories.rs`.
#[derive(Clone)]
pub struct StateRuntime {
@ -51,53 +51,6 @@ pub struct StateRuntime {
pool: Arc<sqlx::SqlitePool>,
}
/// Result of trying to claim a stage-1 memory extraction job.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Stage1JobClaimOutcome {
/// The caller owns the job and should continue with extraction.
Claimed { ownership_token: String },
/// Existing output is already newer than or equal to the source rollout.
SkippedUpToDate,
/// Another worker currently owns a fresh lease for this job.
SkippedRunning,
/// The job is in backoff and should not be retried yet.
SkippedRetryBackoff,
/// The job has exhausted retries and should not be retried automatically.
SkippedRetryExhausted,
}
/// Claimed stage-1 job with thread metadata.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Stage1JobClaim {
pub thread: ThreadMetadata,
pub ownership_token: String,
}
#[derive(Debug, Clone, Copy)]
pub struct Stage1StartupClaimParams<'a> {
pub scan_limit: usize,
pub max_claimed: usize,
pub max_age_days: i64,
pub min_rollout_idle_hours: i64,
pub allowed_sources: &'a [String],
pub lease_seconds: i64,
}
/// Result of trying to claim a phase-2 consolidation job.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Phase2JobClaimOutcome {
/// The caller owns the global lock and should spawn consolidation.
Claimed {
ownership_token: String,
/// Snapshot of `input_watermark` at claim time.
input_watermark: i64,
},
/// The global job is not pending consolidation (or is already up to date).
SkippedNotDirty,
/// Another worker currently owns a fresh global consolidation lease.
SkippedRunning,
}
impl StateRuntime {
/// Initialize the state runtime using the provided Codex home and default provider.
///
@ -914,14 +867,14 @@ fn push_thread_order_and_limit(
#[cfg(test)]
mod tests {
use super::Phase2JobClaimOutcome;
use super::STATE_DB_FILENAME;
use super::STATE_DB_VERSION;
use super::Stage1JobClaimOutcome;
use super::Stage1StartupClaimParams;
use super::StateRuntime;
use super::ThreadMetadata;
use super::state_db_filename;
use crate::model::Phase2JobClaimOutcome;
use crate::model::Stage1JobClaimOutcome;
use crate::model::Stage1StartupClaimParams;
use chrono::DateTime;
use chrono::Duration;
use chrono::Utc;
@ -1582,9 +1535,9 @@ WHERE kind = 'memory_stage1'
.expect("list stage1 outputs for global");
assert_eq!(outputs.len(), 2);
assert_eq!(outputs[0].thread_id, thread_id_b);
assert_eq!(outputs[0].summary, "summary b");
assert_eq!(outputs[0].rollout_summary, "summary b");
assert_eq!(outputs[1].thread_id, thread_id_a);
assert_eq!(outputs[1].summary, "summary a");
assert_eq!(outputs[1].rollout_summary, "summary a");
let _ = tokio::fs::remove_dir_all(codex_home).await;
}

View file

@ -1,6 +1,10 @@
use super::*;
use crate::Stage1Output;
use crate::model::Phase2JobClaimOutcome;
use crate::model::Stage1JobClaim;
use crate::model::Stage1JobClaimOutcome;
use crate::model::Stage1Output;
use crate::model::Stage1OutputRow;
use crate::model::Stage1StartupClaimParams;
use crate::model::ThreadRow;
use chrono::Duration;
use sqlx::Executor;
@ -117,7 +121,7 @@ FROM threads
) -> anyhow::Result<Option<Stage1Output>> {
let row = sqlx::query(
r#"
SELECT thread_id, source_updated_at, raw_memory, summary, generated_at
SELECT thread_id, source_updated_at, raw_memory, rollout_summary, generated_at
FROM stage1_outputs
WHERE thread_id = ?
"#,
@ -140,9 +144,8 @@ WHERE thread_id = ?
let rows = sqlx::query(
r#"
SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.summary, so.generated_at
SELECT so.thread_id, so.source_updated_at, so.raw_memory, so.rollout_summary, so.generated_at
FROM stage1_outputs AS so
JOIN threads AS t ON t.id = so.thread_id
ORDER BY so.source_updated_at DESC, so.thread_id DESC
LIMIT ?
"#,
@ -329,7 +332,7 @@ WHERE kind = ? AND job_key = ?
ownership_token: &str,
source_updated_at: i64,
raw_memory: &str,
summary: &str,
rollout_summary: &str,
) -> anyhow::Result<bool> {
let now = Utc::now().timestamp();
let thread_id = thread_id.to_string();
@ -367,13 +370,13 @@ INSERT INTO stage1_outputs (
thread_id,
source_updated_at,
raw_memory,
summary,
rollout_summary,
generated_at
) VALUES (?, ?, ?, ?, ?)
ON CONFLICT(thread_id) DO UPDATE SET
source_updated_at = excluded.source_updated_at,
raw_memory = excluded.raw_memory,
summary = excluded.summary,
rollout_summary = excluded.rollout_summary,
generated_at = excluded.generated_at
WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
"#,
@ -381,7 +384,7 @@ WHERE excluded.source_updated_at >= stage1_outputs.source_updated_at
.bind(thread_id.as_str())
.bind(source_updated_at)
.bind(raw_memory)
.bind(summary)
.bind(rollout_summary)
.bind(now)
.execute(&mut *tx)
.await?;