feat: mem v2 - PR2 (#11365)
# Memories migration plan (simplified global workflow) ## Target behavior - One shared memory root only: `~/.codex/memories/`. - No per-cwd memory buckets, no cwd hash handling. - Phase 1 candidate rules: - Not currently being processed unless the job lease is stale. - Rollout updated within the max-age window (currently 30 days). - Rollout idle for at least 12 hours (new constant). - Global cap: at most 64 stage-1 jobs in `running` state at any time (new invariant). - Stage-1 model output shape (new): - `rollout_slug` (accepted but ignored for now). - `rollout_summary`. - `raw_memory`. - Phase-1 artifacts written under the shared root: - `rollout_summaries/<thread_id>.md` for each rollout summary. - `raw_memories.md` containing appended/merged raw memory paragraphs. - Phase 2 runs one consolidation agent for the shared `memories/` directory. - Phase-2 lock is DB-backed with 1 hour lease and heartbeat/expiry. ## Current code map - Core startup pipeline: `core/src/memories/startup/mod.rs`. - Stage-1 request+parse: `core/src/memories/startup/extract.rs`, `core/src/memories/stage_one.rs`, templates in `core/templates/memories/`. - File materialization: `core/src/memories/storage.rs`, `core/src/memories/layout.rs`. - Scope routing (cwd/user): `core/src/memories/scope.rs`, `core/src/memories/startup/mod.rs`. - DB job lifecycle and scope queueing: `state/src/runtime/memory.rs`. ## PR plan ## PR 1: Correct phase-1 selection invariants (no behavior-breaking layout changes yet) - Add `PHASE_ONE_MIN_ROLLOUT_IDLE_HOURS: i64 = 12` in `core/src/memories/mod.rs`. - Thread this into `state::claim_stage1_jobs_for_startup(...)`. - Enforce idle-time filter in DB selection logic (not only in-memory filtering after `scan_limit`) so eligible threads are not starved by very recent threads. - Enforce global running cap of 64 at claim time in DB logic: - Count fresh `memory_stage1` running jobs. - Only allow new claims while count < cap. - Keep stale-lease takeover behavior intact. - Add/adjust tests in `state/src/runtime.rs`: - Idle filter inclusion/exclusion around 12h boundary. - Global running-cap guarantee. - Existing stale/fresh ownership behavior still passes. Acceptance criteria: - Startup never creates more than 64 fresh `memory_stage1` running jobs. - Threads updated <12h ago are skipped. - Threads older than 30d are skipped. ## PR 2: Stage-1 output contract + storage artifacts (forward-compatible) - Update parser/types to accept the new structured output while keeping backward compatibility: - Add `rollout_slug` (optional for now). - Add `rollout_summary`. - Keep alias support for legacy `summary` and `rawMemory` until prompt swap completes. - Update stage-1 schema generator in `core/src/memories/stage_one.rs` to include the new keys. - Update prompt templates: - `core/templates/memories/stage_one_system.md`. - `core/templates/memories/stage_one_input.md`. - Replace storage model in `core/src/memories/storage.rs`: - Introduce `rollout_summaries/` directory writer (`<thread_id>.md` files). - Introduce `raw_memories.md` aggregator writer from DB rows. - Keep deterministic rebuild behavior from DB outputs so files can always be regenerated. - Update consolidation prompt template to reference `rollout_summaries/` + `raw_memories.md` inputs. Acceptance criteria: - Stage-1 accepts both old and new output keys during migration. - Phase-1 artifacts are generated in new format from DB state. - No dependence on per-thread files in `raw_memories/`. ## PR 3: Remove per-cwd memories and move to one global memory root - Simplify layout in `core/src/memories/layout.rs`: - Single root: `codex_home/memories`. - Remove cwd-hash bucket helpers and normalization logic used only for memory pathing. - Remove scope branching from startup phase-2 dispatch path: - No cwd/user mapping in `core/src/memories/startup/mod.rs`. - One target root for consolidation. - In `state/src/runtime/memory.rs`, stop enqueueing/handling cwd consolidation scope. - Keep one logical consolidation scope/job key (global/user) to avoid a risky schema rewrite in same PR. - Add one-time migration helper (core side) to preserve current shared memory output: - If `~/.codex/memories/user/memory` exists and new root is empty, move/copy contents into `~/.codex/memories`. - Leave old hashed cwd buckets untouched for now (safe/no-destructive migration). Acceptance criteria: - New runs only read/write `~/.codex/memories`. - No new cwd-scoped consolidation jobs are enqueued. - Existing user-shared memory content is preserved. ## PR 4: Phase-2 global lock simplification and cleanup - Replace multi-scope dispatch with a single global consolidation claim path: - Either reuse jobs table with one fixed key, or add a tiny dedicated lock helper; keep 1h lease. - Ensure at most one consolidation agent can run at once. - Keep heartbeat + stale lock recovery semantics in `core/src/memories/startup/watch.rs`. - Remove dead scope code and legacy constants no longer used. - Update tests: - One-agent-at-a-time behavior. - Lock expiry allows takeover after stale lease. Acceptance criteria: - Exactly one phase-2 consolidation agent can be active cluster-wide (per local DB). - Stale lock recovers automatically. ## PR 5: Final cleanup and docs - Remove legacy artifacts and references: - `raw_memories/` and `memory_summary.md` assumptions from prompts/comments/tests. - Scope constants for cwd memory pathing in core/state if fully unused. - Update docs under `docs/` for memory workflow and directory layout. - Add a brief operator note for rollout: compatibility window for old stage-1 JSON keys and when to remove aliases. Acceptance criteria: - Code and docs reflect only the simplified global workflow. - No stale references to per-cwd memory buckets. ## Notes on sequencing - PR 1 is safest first because it improves correctness without changing external artifact layout. - PR 2 keeps parser compatibility so prompt deployment can happen independently. - PR 3 and PR 4 split filesystem/scope simplification from locking simplification to reduce blast radius. - PR 5 is intentionally cleanup-only.
This commit is contained in:
parent
07da740c8a
commit
0229dc5ccf
10 changed files with 136 additions and 145 deletions
|
|
@ -7,8 +7,8 @@ use std::path::PathBuf;
|
|||
use super::scope::MEMORY_SCOPE_KEY_USER;
|
||||
|
||||
pub(super) const MEMORY_SUBDIR: &str = "memory";
|
||||
pub(super) const RAW_MEMORIES_SUBDIR: &str = "raw_memories";
|
||||
pub(super) const MEMORY_SUMMARY_FILENAME: &str = "memory_summary.md";
|
||||
pub(super) const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries";
|
||||
pub(super) const RAW_MEMORIES_FILENAME: &str = "raw_memories.md";
|
||||
pub(super) const MEMORY_REGISTRY_FILENAME: &str = "MEMORY.md";
|
||||
pub(super) const LEGACY_CONSOLIDATED_FILENAME: &str = "consolidated.md";
|
||||
pub(super) const SKILLS_SUBDIR: &str = "skills";
|
||||
|
|
@ -32,17 +32,17 @@ pub(super) fn memory_root_for_user(codex_home: &Path) -> PathBuf {
|
|||
.join(MEMORY_SUBDIR)
|
||||
}
|
||||
|
||||
pub(super) fn raw_memories_dir(root: &Path) -> PathBuf {
|
||||
root.join(RAW_MEMORIES_SUBDIR)
|
||||
pub(super) fn rollout_summaries_dir(root: &Path) -> PathBuf {
|
||||
root.join(ROLLOUT_SUMMARIES_SUBDIR)
|
||||
}
|
||||
|
||||
pub(super) fn memory_summary_file(root: &Path) -> PathBuf {
|
||||
root.join(MEMORY_SUMMARY_FILENAME)
|
||||
pub(super) fn raw_memories_file(root: &Path) -> PathBuf {
|
||||
root.join(RAW_MEMORIES_FILENAME)
|
||||
}
|
||||
|
||||
/// Ensures the phase-1 memory directory layout exists for the given root.
|
||||
pub(super) async fn ensure_layout(root: &Path) -> std::io::Result<()> {
|
||||
tokio::fs::create_dir_all(raw_memories_dir(root)).await
|
||||
tokio::fs::create_dir_all(rollout_summaries_dir(root)).await
|
||||
}
|
||||
|
||||
fn memory_bucket_for_cwd(cwd: &Path) -> String {
|
||||
|
|
|
|||
|
|
@ -28,10 +28,11 @@ pub(super) fn stage_one_output_schema() -> Value {
|
|||
json!({
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"rawMemory": { "type": "string" },
|
||||
"summary": { "type": "string" }
|
||||
"rollout_slug": { "type": "string" },
|
||||
"rollout_summary": { "type": "string" },
|
||||
"raw_memory": { "type": "string" }
|
||||
},
|
||||
"required": ["rawMemory", "summary"],
|
||||
"required": ["rollout_summary", "raw_memory"],
|
||||
"additionalProperties": false
|
||||
})
|
||||
}
|
||||
|
|
@ -95,21 +96,27 @@ fn parse_json_object_loose(raw: &str) -> Result<Value> {
|
|||
|
||||
fn normalize_stage_one_output(mut output: StageOneOutput) -> Result<StageOneOutput> {
|
||||
output.raw_memory = output.raw_memory.trim().to_string();
|
||||
output.summary = output.summary.trim().to_string();
|
||||
output.rollout_summary = output.rollout_summary.trim().to_string();
|
||||
if let Some(slug) = output.rollout_slug.take() {
|
||||
let slug = slug.trim();
|
||||
if !slug.is_empty() {
|
||||
output.rollout_slug = Some(slug.to_string());
|
||||
}
|
||||
}
|
||||
|
||||
if output.raw_memory.is_empty() {
|
||||
return Err(CodexErr::InvalidRequest(
|
||||
"stage-1 memory output missing rawMemory".to_string(),
|
||||
"stage-1 memory output missing raw_memory".to_string(),
|
||||
));
|
||||
}
|
||||
if output.summary.is_empty() {
|
||||
if output.rollout_summary.is_empty() {
|
||||
return Err(CodexErr::InvalidRequest(
|
||||
"stage-1 memory output missing summary".to_string(),
|
||||
"stage-1 memory output missing rollout_summary".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
output.raw_memory = normalize_raw_memory_structure(&redact_secrets(&output.raw_memory));
|
||||
output.summary = redact_secrets(&compact_whitespace(&output.summary));
|
||||
output.rollout_summary = redact_secrets(&compact_whitespace(&output.rollout_summary));
|
||||
|
||||
if output.raw_memory.len() > MAX_STAGE_ONE_RAW_MEMORY_CHARS {
|
||||
output.raw_memory = truncate_text_for_storage(
|
||||
|
|
@ -119,9 +126,9 @@ fn normalize_stage_one_output(mut output: StageOneOutput) -> Result<StageOneOutp
|
|||
);
|
||||
}
|
||||
|
||||
if output.summary.len() > MAX_STAGE_ONE_SUMMARY_CHARS {
|
||||
output.summary = truncate_text_for_storage(
|
||||
&output.summary,
|
||||
if output.rollout_summary.len() > MAX_STAGE_ONE_SUMMARY_CHARS {
|
||||
output.rollout_summary = truncate_text_for_storage(
|
||||
&output.rollout_summary,
|
||||
MAX_STAGE_ONE_SUMMARY_CHARS,
|
||||
" [...summary truncated...]",
|
||||
);
|
||||
|
|
@ -188,14 +195,18 @@ mod tests {
|
|||
fn normalize_stage_one_output_redacts_and_compacts_summary() {
|
||||
let output = StageOneOutput {
|
||||
raw_memory: "Token: sk-abcdefghijklmnopqrstuvwxyz123456\nBearer abcdefghijklmnopqrstuvwxyz012345".to_string(),
|
||||
summary: "password = mysecret123456\n\nsmall".to_string(),
|
||||
rollout_slug: None,
|
||||
rollout_summary: "password = mysecret123456\n\nsmall".to_string(),
|
||||
};
|
||||
|
||||
let normalized = normalize_stage_one_output(output).expect("normalized");
|
||||
|
||||
assert!(normalized.raw_memory.contains("[REDACTED_SECRET]"));
|
||||
assert!(!normalized.summary.contains("mysecret123456"));
|
||||
assert_eq!(normalized.summary, "password = [REDACTED_SECRET] small");
|
||||
assert!(!normalized.rollout_summary.contains("mysecret123456"));
|
||||
assert_eq!(
|
||||
normalized.rollout_summary,
|
||||
"password = [REDACTED_SECRET] small"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
|||
|
|
@ -13,8 +13,8 @@ use super::super::MEMORY_CONSOLIDATION_SUBAGENT_LABEL;
|
|||
use super::super::PHASE_TWO_JOB_LEASE_SECONDS;
|
||||
use super::super::PHASE_TWO_JOB_RETRY_DELAY_SECONDS;
|
||||
use super::super::prompts::build_consolidation_prompt;
|
||||
use super::super::storage::rebuild_memory_summary_from_memories;
|
||||
use super::super::storage::sync_raw_memories_from_memories;
|
||||
use super::super::storage::rebuild_raw_memories_file_from_memories;
|
||||
use super::super::storage::sync_rollout_summaries_from_memories;
|
||||
use super::super::storage::wipe_consolidation_outputs;
|
||||
use super::MemoryScopeTarget;
|
||||
use super::watch::spawn_phase2_completion_task;
|
||||
|
|
@ -119,9 +119,11 @@ pub(super) async fn run_memory_consolidation_for_scope(
|
|||
.max()
|
||||
.unwrap_or(claimed_watermark);
|
||||
|
||||
if let Err(err) = sync_raw_memories_from_memories(&scope.memory_root, &latest_memories).await {
|
||||
if let Err(err) =
|
||||
sync_rollout_summaries_from_memories(&scope.memory_root, &latest_memories).await
|
||||
{
|
||||
warn!(
|
||||
"failed syncing phase-1 raw memories for scope {}:{}: {err}",
|
||||
"failed syncing phase-1 rollout summaries for scope {}:{}: {err}",
|
||||
scope.scope_kind, scope.scope_key
|
||||
);
|
||||
let _ = state_db
|
||||
|
|
@ -129,7 +131,7 @@ pub(super) async fn run_memory_consolidation_for_scope(
|
|||
scope.scope_kind,
|
||||
&scope.scope_key,
|
||||
&ownership_token,
|
||||
"failed syncing phase-1 raw memories",
|
||||
"failed syncing phase-1 rollout summaries",
|
||||
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
|
||||
)
|
||||
.await;
|
||||
|
|
@ -137,10 +139,10 @@ pub(super) async fn run_memory_consolidation_for_scope(
|
|||
}
|
||||
|
||||
if let Err(err) =
|
||||
rebuild_memory_summary_from_memories(&scope.memory_root, &latest_memories).await
|
||||
rebuild_raw_memories_file_from_memories(&scope.memory_root, &latest_memories).await
|
||||
{
|
||||
warn!(
|
||||
"failed rebuilding memory summary for scope {}:{}: {err}",
|
||||
"failed rebuilding raw memories aggregate for scope {}:{}: {err}",
|
||||
scope.scope_kind, scope.scope_key
|
||||
);
|
||||
let _ = state_db
|
||||
|
|
@ -148,7 +150,7 @@ pub(super) async fn run_memory_consolidation_for_scope(
|
|||
scope.scope_kind,
|
||||
&scope.scope_key,
|
||||
&ownership_token,
|
||||
"failed rebuilding memory summary",
|
||||
"failed rebuilding raw memories aggregate",
|
||||
PHASE_TWO_JOB_RETRY_DELAY_SECONDS,
|
||||
)
|
||||
.await;
|
||||
|
|
|
|||
|
|
@ -222,7 +222,7 @@ pub(super) async fn run_memories_startup_pipeline(
|
|||
&claim.ownership_token,
|
||||
thread.updated_at.timestamp(),
|
||||
&stage_one_output.raw_memory,
|
||||
&stage_one_output.summary,
|
||||
&stage_one_output.rollout_summary,
|
||||
)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ use codex_state::Stage1Output;
|
|||
use std::collections::BTreeSet;
|
||||
use std::fmt::Write as _;
|
||||
use std::path::Path;
|
||||
use std::path::PathBuf;
|
||||
use tracing::warn;
|
||||
|
||||
use super::MAX_RAW_MEMORIES_PER_SCOPE;
|
||||
|
|
@ -11,20 +10,20 @@ use crate::memories::layout::LEGACY_CONSOLIDATED_FILENAME;
|
|||
use crate::memories::layout::MEMORY_REGISTRY_FILENAME;
|
||||
use crate::memories::layout::SKILLS_SUBDIR;
|
||||
use crate::memories::layout::ensure_layout;
|
||||
use crate::memories::layout::memory_summary_file;
|
||||
use crate::memories::layout::raw_memories_dir;
|
||||
use crate::memories::layout::raw_memories_file;
|
||||
use crate::memories::layout::rollout_summaries_dir;
|
||||
|
||||
/// Rebuild `memory_summary.md` for a scope without pruning raw memory files.
|
||||
pub(super) async fn rebuild_memory_summary_from_memories(
|
||||
/// Rebuild `raw_memories.md` from DB-backed stage-1 outputs.
|
||||
pub(super) async fn rebuild_raw_memories_file_from_memories(
|
||||
root: &Path,
|
||||
memories: &[Stage1Output],
|
||||
) -> std::io::Result<()> {
|
||||
ensure_layout(root).await?;
|
||||
rebuild_memory_summary(root, memories).await
|
||||
rebuild_raw_memories_file(root, memories).await
|
||||
}
|
||||
|
||||
/// Syncs canonical raw memory files from DB-backed memory rows.
|
||||
pub(super) async fn sync_raw_memories_from_memories(
|
||||
/// Syncs canonical rollout summary files from DB-backed stage-1 output rows.
|
||||
pub(super) async fn sync_rollout_summaries_from_memories(
|
||||
root: &Path,
|
||||
memories: &[Stage1Output],
|
||||
) -> std::io::Result<()> {
|
||||
|
|
@ -38,17 +37,17 @@ pub(super) async fn sync_raw_memories_from_memories(
|
|||
.iter()
|
||||
.map(|memory| memory.thread_id.to_string())
|
||||
.collect::<BTreeSet<_>>();
|
||||
prune_raw_memories(root, &keep).await?;
|
||||
prune_rollout_summaries(root, &keep).await?;
|
||||
|
||||
for memory in retained {
|
||||
write_raw_memory_for_thread(root, memory).await?;
|
||||
write_rollout_summary_for_thread(root, memory).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Clears consolidation outputs so a fresh consolidation run can regenerate them.
|
||||
///
|
||||
/// Phase-1 artifacts (`raw_memories/` and `memory_summary.md`) are preserved.
|
||||
/// Phase-1 artifacts (`rollout_summaries/` and `raw_memories.md`) are preserved.
|
||||
pub(super) async fn wipe_consolidation_outputs(root: &Path) -> std::io::Result<()> {
|
||||
for file_name in [MEMORY_REGISTRY_FILENAME, LEGACY_CONSOLIDATED_FILENAME] {
|
||||
let path = root.join(file_name);
|
||||
|
|
@ -75,26 +74,39 @@ pub(super) async fn wipe_consolidation_outputs(root: &Path) -> std::io::Result<(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn rebuild_memory_summary(root: &Path, memories: &[Stage1Output]) -> std::io::Result<()> {
|
||||
let mut body = String::from("# Memory Summary\n\n");
|
||||
async fn rebuild_raw_memories_file(root: &Path, memories: &[Stage1Output]) -> std::io::Result<()> {
|
||||
let retained = memories
|
||||
.iter()
|
||||
.take(MAX_RAW_MEMORIES_PER_SCOPE)
|
||||
.collect::<Vec<_>>();
|
||||
let mut body = String::from("# Raw Memories\n\n");
|
||||
|
||||
if memories.is_empty() {
|
||||
if retained.is_empty() {
|
||||
body.push_str("No raw memories yet.\n");
|
||||
return tokio::fs::write(memory_summary_file(root), body).await;
|
||||
return tokio::fs::write(raw_memories_file(root), body).await;
|
||||
}
|
||||
|
||||
body.push_str("Map of concise summaries to thread IDs (latest first):\n\n");
|
||||
for memory in memories.iter().take(MAX_RAW_MEMORIES_PER_SCOPE) {
|
||||
let summary = compact_whitespace(&memory.summary);
|
||||
writeln!(body, "- {summary} (thread: `{}`)", memory.thread_id)
|
||||
.map_err(|err| std::io::Error::other(format!("format memory summary: {err}")))?;
|
||||
body.push_str("Merged stage-1 raw memories (latest first):\n\n");
|
||||
for memory in retained {
|
||||
writeln!(body, "## Thread `{}`", memory.thread_id)
|
||||
.map_err(|err| std::io::Error::other(format!("format raw memories: {err}")))?;
|
||||
writeln!(
|
||||
body,
|
||||
"updated_at: {}",
|
||||
memory.source_updated_at.to_rfc3339()
|
||||
)
|
||||
.map_err(|err| std::io::Error::other(format!("format raw memories: {err}")))?;
|
||||
writeln!(body)
|
||||
.map_err(|err| std::io::Error::other(format!("format raw memories: {err}")))?;
|
||||
body.push_str(memory.raw_memory.trim());
|
||||
body.push_str("\n\n");
|
||||
}
|
||||
|
||||
tokio::fs::write(memory_summary_file(root), body).await
|
||||
tokio::fs::write(raw_memories_file(root), body).await
|
||||
}
|
||||
|
||||
async fn prune_raw_memories(root: &Path, keep: &BTreeSet<String>) -> std::io::Result<()> {
|
||||
let dir_path = raw_memories_dir(root);
|
||||
async fn prune_rollout_summaries(root: &Path, keep: &BTreeSet<String>) -> std::io::Result<()> {
|
||||
let dir_path = rollout_summaries_dir(root);
|
||||
let mut dir = match tokio::fs::read_dir(&dir_path).await {
|
||||
Ok(dir) => dir,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
|
||||
|
|
@ -106,7 +118,7 @@ async fn prune_raw_memories(root: &Path, keep: &BTreeSet<String>) -> std::io::Re
|
|||
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
|
||||
continue;
|
||||
};
|
||||
let Some(thread_id) = extract_thread_id_from_summary_filename(file_name) else {
|
||||
let Some(thread_id) = extract_thread_id_from_rollout_summary_filename(file_name) else {
|
||||
continue;
|
||||
};
|
||||
if !keep.contains(thread_id)
|
||||
|
|
@ -114,7 +126,7 @@ async fn prune_raw_memories(root: &Path, keep: &BTreeSet<String>) -> std::io::Re
|
|||
&& err.kind() != std::io::ErrorKind::NotFound
|
||||
{
|
||||
warn!(
|
||||
"failed pruning outdated raw memory {}: {err}",
|
||||
"failed pruning outdated rollout summary {}: {err}",
|
||||
path.display()
|
||||
);
|
||||
}
|
||||
|
|
@ -123,79 +135,30 @@ async fn prune_raw_memories(root: &Path, keep: &BTreeSet<String>) -> std::io::Re
|
|||
Ok(())
|
||||
}
|
||||
|
||||
async fn remove_outdated_thread_raw_memories(
|
||||
root: &Path,
|
||||
thread_id: &str,
|
||||
keep_path: &Path,
|
||||
) -> std::io::Result<()> {
|
||||
let dir_path = raw_memories_dir(root);
|
||||
let mut dir = match tokio::fs::read_dir(&dir_path).await {
|
||||
Ok(dir) => dir,
|
||||
Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(()),
|
||||
Err(err) => return Err(err),
|
||||
};
|
||||
|
||||
while let Some(entry) = dir.next_entry().await? {
|
||||
let path = entry.path();
|
||||
if path == keep_path {
|
||||
continue;
|
||||
}
|
||||
let Some(file_name) = path.file_name().and_then(|name| name.to_str()) else {
|
||||
continue;
|
||||
};
|
||||
let Some(existing_thread_id) = extract_thread_id_from_summary_filename(file_name) else {
|
||||
continue;
|
||||
};
|
||||
if existing_thread_id == thread_id
|
||||
&& let Err(err) = tokio::fs::remove_file(&path).await
|
||||
&& err.kind() != std::io::ErrorKind::NotFound
|
||||
{
|
||||
warn!(
|
||||
"failed removing outdated raw memory {}: {err}",
|
||||
path.display()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn write_raw_memory_for_thread(
|
||||
async fn write_rollout_summary_for_thread(
|
||||
root: &Path,
|
||||
memory: &Stage1Output,
|
||||
) -> std::io::Result<PathBuf> {
|
||||
let path = raw_memories_dir(root).join(format!("{}.md", memory.thread_id));
|
||||
|
||||
remove_outdated_thread_raw_memories(root, &memory.thread_id.to_string(), &path).await?;
|
||||
) -> std::io::Result<()> {
|
||||
let path = rollout_summaries_dir(root).join(format!("{}.md", memory.thread_id));
|
||||
|
||||
let mut body = String::new();
|
||||
writeln!(body, "thread_id: {}", memory.thread_id)
|
||||
.map_err(|err| std::io::Error::other(format!("format raw memory: {err}")))?;
|
||||
.map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?;
|
||||
writeln!(
|
||||
body,
|
||||
"updated_at: {}",
|
||||
memory.source_updated_at.to_rfc3339()
|
||||
)
|
||||
.map_err(|err| std::io::Error::other(format!("format raw memory: {err}")))?;
|
||||
writeln!(body).map_err(|err| std::io::Error::other(format!("format raw memory: {err}")))?;
|
||||
body.push_str(memory.raw_memory.trim());
|
||||
.map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?;
|
||||
writeln!(body)
|
||||
.map_err(|err| std::io::Error::other(format!("format rollout summary: {err}")))?;
|
||||
body.push_str(&compact_whitespace(&memory.summary));
|
||||
body.push('\n');
|
||||
|
||||
tokio::fs::write(&path, body).await?;
|
||||
Ok(path)
|
||||
tokio::fs::write(path, body).await
|
||||
}
|
||||
|
||||
fn extract_thread_id_from_summary_filename(file_name: &str) -> Option<&str> {
|
||||
fn extract_thread_id_from_rollout_summary_filename(file_name: &str) -> Option<&str> {
|
||||
let stem = file_name.strip_suffix(".md")?;
|
||||
if stem.is_empty() {
|
||||
None
|
||||
} else if let Some((thread_id, _legacy_slug)) = stem.split_once('_') {
|
||||
if thread_id.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(thread_id)
|
||||
}
|
||||
} else {
|
||||
Some(stem)
|
||||
}
|
||||
if stem.is_empty() { None } else { Some(stem) }
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,13 +2,13 @@ use super::rollout::StageOneResponseItemKinds;
|
|||
use super::rollout::StageOneRolloutFilter;
|
||||
use super::rollout::serialize_filtered_rollout_response_items;
|
||||
use super::stage_one::parse_stage_one_output;
|
||||
use super::storage::rebuild_memory_summary_from_memories;
|
||||
use super::storage::sync_raw_memories_from_memories;
|
||||
use super::storage::rebuild_raw_memories_file_from_memories;
|
||||
use super::storage::sync_rollout_summaries_from_memories;
|
||||
use super::storage::wipe_consolidation_outputs;
|
||||
use crate::memories::layout::ensure_layout;
|
||||
use crate::memories::layout::memory_root_for_cwd;
|
||||
use crate::memories::layout::memory_summary_file;
|
||||
use crate::memories::layout::raw_memories_dir;
|
||||
use crate::memories::layout::raw_memories_file;
|
||||
use crate::memories::layout::rollout_summaries_dir;
|
||||
use chrono::TimeZone;
|
||||
use chrono::Utc;
|
||||
use codex_protocol::ThreadId;
|
||||
|
|
@ -65,10 +65,20 @@ fn memory_root_encoding_avoids_component_collisions() {
|
|||
|
||||
#[test]
|
||||
fn parse_stage_one_output_accepts_fenced_json() {
|
||||
let raw = "```json\n{\"rawMemory\":\"abc\",\"summary\":\"short\"}\n```";
|
||||
let raw = "```json\n{\"raw_memory\":\"abc\",\"rollout_summary\":\"short\",\"rollout_slug\":\"slug\"}\n```";
|
||||
let parsed = parse_stage_one_output(raw).expect("parsed");
|
||||
assert!(parsed.raw_memory.contains("abc"));
|
||||
assert_eq!(parsed.summary, "short");
|
||||
assert_eq!(parsed.rollout_summary, "short");
|
||||
assert_eq!(parsed.rollout_slug, Some("slug".to_string()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_stage_one_output_accepts_legacy_keys() {
|
||||
let raw = r#"{"rawMemory":"abc","summary":"short"}"#;
|
||||
let parsed = parse_stage_one_output(raw).expect("parsed");
|
||||
assert!(parsed.raw_memory.contains("abc"));
|
||||
assert_eq!(parsed.rollout_summary, "short");
|
||||
assert_eq!(parsed.rollout_slug, None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
@ -172,15 +182,15 @@ fn serialize_filtered_rollout_response_items_filters_by_response_item_kind() {
|
|||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn prune_and_rebuild_summary_keeps_latest_memories_only() {
|
||||
async fn sync_rollout_summaries_and_raw_memories_file_keeps_latest_memories_only() {
|
||||
let dir = tempdir().expect("tempdir");
|
||||
let root = dir.path().join("memory");
|
||||
ensure_layout(&root).await.expect("ensure layout");
|
||||
|
||||
let keep_id = ThreadId::default().to_string();
|
||||
let drop_id = ThreadId::default().to_string();
|
||||
let keep_path = raw_memories_dir(&root).join(format!("{keep_id}.md"));
|
||||
let drop_path = raw_memories_dir(&root).join(format!("{drop_id}.md"));
|
||||
let keep_path = rollout_summaries_dir(&root).join(format!("{keep_id}.md"));
|
||||
let drop_path = rollout_summaries_dir(&root).join(format!("{drop_id}.md"));
|
||||
tokio::fs::write(&keep_path, "keep")
|
||||
.await
|
||||
.expect("write keep");
|
||||
|
|
@ -196,21 +206,21 @@ async fn prune_and_rebuild_summary_keeps_latest_memories_only() {
|
|||
generated_at: Utc.timestamp_opt(101, 0).single().expect("timestamp"),
|
||||
}];
|
||||
|
||||
sync_raw_memories_from_memories(&root, &memories)
|
||||
sync_rollout_summaries_from_memories(&root, &memories)
|
||||
.await
|
||||
.expect("sync raw memories");
|
||||
rebuild_memory_summary_from_memories(&root, &memories)
|
||||
.expect("sync rollout summaries");
|
||||
rebuild_raw_memories_file_from_memories(&root, &memories)
|
||||
.await
|
||||
.expect("rebuild memory summary");
|
||||
.expect("rebuild raw memories");
|
||||
|
||||
assert!(keep_path.is_file());
|
||||
assert!(!drop_path.exists());
|
||||
|
||||
let summary = tokio::fs::read_to_string(memory_summary_file(&root))
|
||||
let raw_memories = tokio::fs::read_to_string(raw_memories_file(&root))
|
||||
.await
|
||||
.expect("read summary");
|
||||
assert!(summary.contains("short summary"));
|
||||
assert!(summary.contains(&keep_id));
|
||||
.expect("read raw memories");
|
||||
assert!(raw_memories.contains("raw memory"));
|
||||
assert!(raw_memories.contains(&keep_id));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
|||
|
|
@ -4,8 +4,12 @@ use serde::Deserialize;
|
|||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub(super) struct StageOneOutput {
|
||||
/// Detailed markdown raw memory for a single rollout.
|
||||
#[serde(rename = "rawMemory", alias = "traceMemory")]
|
||||
#[serde(rename = "raw_memory", alias = "rawMemory", alias = "traceMemory")]
|
||||
pub(super) raw_memory: String,
|
||||
/// Optional rollout slug from the model output. Accepted but ignored.
|
||||
#[serde(default)]
|
||||
pub(super) rollout_slug: Option<String>,
|
||||
/// Compact summary line used for routing and indexing.
|
||||
pub(super) summary: String,
|
||||
#[serde(rename = "rollout_summary", alias = "summary")]
|
||||
pub(super) rollout_summary: String,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,11 +2,11 @@
|
|||
Consolidate Codex memories in this directory: {{ memory_root }}
|
||||
|
||||
Phase-1 inputs already prepared in this same directory:
|
||||
- `raw_memories/` contains per-thread raw memory markdown files.
|
||||
- `memory_summary.md` contains a compact routing map from short summary -> thread id.
|
||||
- `rollout_summaries/` contains per-thread rollout summary markdown files (`<thread_id>.md`).
|
||||
- `raw_memories.md` contains merged raw memory content from recent stage-1 outputs.
|
||||
|
||||
Consolidation goals:
|
||||
1. Read `memory_summary.md` first to route quickly, then open the most relevant files in `raw_memories/`.
|
||||
1. Read `rollout_summaries/` first to route quickly, then cross-check details in `raw_memories.md`.
|
||||
2. Resolve conflicts explicitly:
|
||||
- prefer newer guidance by default;
|
||||
- if older guidance has stronger evidence, keep both with a verification note.
|
||||
|
|
@ -23,5 +23,5 @@ Expected outputs for this directory (create/update as needed):
|
|||
- `skills/<skill-name>/...`: optional skill folders when there is clear reusable procedure value.
|
||||
|
||||
Do not rewrite phase-1 artifacts except when adding explicit cross-references:
|
||||
- keep `raw_memories/` as phase-1 output;
|
||||
- keep `memory_summary.md` as the compact map generated from the latest summaries.
|
||||
- keep `rollout_summaries/` as phase-1 output;
|
||||
- keep `raw_memories.md` as the merged stage-1 raw-memory artifact.
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
Analyze this rollout and produce `rawMemory` and `summary` as JSON.
|
||||
Analyze this rollout and produce `raw_memory`, `rollout_summary`, and optional `rollout_slug` as JSON.
|
||||
|
||||
rollout_context:
|
||||
- rollout_path: {{ rollout_path }}
|
||||
|
|
|
|||
|
|
@ -2,8 +2,9 @@
|
|||
You are given one rollout and must produce exactly one JSON object.
|
||||
|
||||
Return exactly one JSON object with this schema:
|
||||
- rawMemory: a detailed markdown raw memory for this rollout only.
|
||||
- summary: a concise summary suitable for shared memory aggregation.
|
||||
- raw_memory: a detailed markdown raw memory for this rollout only.
|
||||
- rollout_summary: a concise summary suitable for shared memory aggregation.
|
||||
- rollout_slug: optional stable slug for the rollout (accepted but currently ignored).
|
||||
|
||||
Input contract:
|
||||
- The user message contains:
|
||||
|
|
@ -20,7 +21,7 @@ Global writing rules:
|
|||
- Do not include markdown fences around the JSON object.
|
||||
- Output only the JSON object and nothing else.
|
||||
|
||||
Outcome triage guidance for `Outcome:` labels in `rawMemory`:
|
||||
Outcome triage guidance for `Outcome:` labels in `raw_memory`:
|
||||
- Use `success` for explicit user approval or clear verification evidence.
|
||||
- Use `partial` when there is meaningful progress but incomplete or unverified completion.
|
||||
- Use `fail` for explicit dissatisfaction/rejection or hard failure.
|
||||
|
|
@ -28,7 +29,7 @@ Outcome triage guidance for `Outcome:` labels in `rawMemory`:
|
|||
- If the user switched topics without explicit evaluation, usually use `uncertain`.
|
||||
- If only assistant claims success without user confirmation or verification, use `uncertain`.
|
||||
|
||||
`rawMemory` structure requirements:
|
||||
`raw_memory` structure requirements:
|
||||
- Start with `# <one-sentence summary>`.
|
||||
- Include:
|
||||
- `Memory context: ...`
|
||||
|
|
@ -42,7 +43,7 @@ Outcome triage guidance for `Outcome:` labels in `rawMemory`:
|
|||
- `Pointers and references (annotate why each item matters):`
|
||||
- Prefer more, smaller task sections over one broad mixed section.
|
||||
|
||||
`summary` requirements:
|
||||
`rollout_summary` requirements:
|
||||
- Keep under 120 words.
|
||||
- Capture only the most reusable and actionable outcomes.
|
||||
- Include concrete paths/commands/errors when high-signal.
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue