diff --git a/codex-rs/core/src/memories/mod.rs b/codex-rs/core/src/memories/mod.rs index afc8ff7c8..219fb2ac3 100644 --- a/codex-rs/core/src/memories/mod.rs +++ b/codex-rs/core/src/memories/mod.rs @@ -15,53 +15,64 @@ mod tests; use std::path::Path; use std::path::PathBuf; -/// Subagent source label used to identify consolidation tasks. -const MEMORY_CONSOLIDATION_SUBAGENT_LABEL: &str = "memory_consolidation"; -const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries"; -const RAW_MEMORIES_FILENAME: &str = "raw_memories.md"; -/// Maximum number of rollout candidates processed per startup pass. -const MAX_ROLLOUTS_PER_STARTUP: usize = 64; -/// Concurrency cap for startup memory extraction and consolidation scheduling. -const PHASE_ONE_CONCURRENCY_LIMIT: usize = MAX_ROLLOUTS_PER_STARTUP; -/// Maximum number of recent raw memories retained for global consolidation. -const MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024; -/// Fallback stage-1 rollout truncation limit (tokens) when model metadata -/// does not include a valid context window. -const DEFAULT_STAGE_ONE_ROLLOUT_TOKEN_LIMIT: usize = 150_000; -/// Maximum number of tokens from `memory_summary.md` injected into memory tool -/// developer instructions. -const MEMORY_TOOL_DEVELOPER_INSTRUCTIONS_SUMMARY_TOKEN_LIMIT: usize = 5_000; -/// Portion of the model effective input window reserved for the stage-1 rollout -/// input. -/// -/// Keeping this below 100% leaves room for system instructions, prompt framing, -/// and model output. -const STAGE_ONE_CONTEXT_WINDOW_PERCENT: i64 = 70; -/// Maximum rollout age considered for phase-1 extraction. -const PHASE_ONE_MAX_ROLLOUT_AGE_DAYS: i64 = 30; -/// Minimum rollout idle time required before phase-1 extraction. -const PHASE_ONE_MIN_ROLLOUT_IDLE_HOURS: i64 = 12; -/// Lease duration (seconds) for phase-1 job ownership. -const PHASE_ONE_JOB_LEASE_SECONDS: i64 = 3_600; -/// Backoff delay (seconds) before retrying a failed stage-1 extraction job. -const PHASE_ONE_JOB_RETRY_DELAY_SECONDS: i64 = 3_600; -/// Lease duration (seconds) for phase-2 consolidation job ownership. -const PHASE_TWO_JOB_LEASE_SECONDS: i64 = 3_600; -/// Backoff delay (seconds) before retrying a failed phase-2 consolidation job. -const PHASE_TWO_JOB_RETRY_DELAY_SECONDS: i64 = 3_600; -/// Heartbeat interval (seconds) for phase-2 running jobs. -const PHASE_TWO_JOB_HEARTBEAT_SECONDS: u64 = 30; +mod artifacts { + pub(super) const ROLLOUT_SUMMARIES_SUBDIR: &str = "rollout_summaries"; + pub(super) const RAW_MEMORIES_FILENAME: &str = "raw_memories.md"; +} + +/// Phase 1 (startup extraction). +mod phase_one { + /// Maximum number of rollout candidates processed per startup pass. + pub(super) const MAX_ROLLOUTS_PER_STARTUP: usize = 64; + /// Concurrency cap for startup memory extraction and consolidation scheduling. + pub(super) const CONCURRENCY_LIMIT: usize = MAX_ROLLOUTS_PER_STARTUP; + /// Fallback stage-1 rollout truncation limit (tokens) when model metadata + /// does not include a valid context window. + pub(super) const DEFAULT_STAGE_ONE_ROLLOUT_TOKEN_LIMIT: usize = 150_000; + /// Maximum number of tokens from `memory_summary.md` injected into memory + /// tool developer instructions. + pub(super) const MEMORY_TOOL_DEVELOPER_INSTRUCTIONS_SUMMARY_TOKEN_LIMIT: usize = 5_000; + /// Portion of the model effective input window reserved for the stage-1 + /// rollout input. + /// + /// Keeping this below 100% leaves room for system instructions, prompt + /// framing, and model output. + pub(super) const CONTEXT_WINDOW_PERCENT: i64 = 70; + /// Maximum rollout age considered for phase-1 extraction. + pub(super) const MAX_ROLLOUT_AGE_DAYS: i64 = 30; + /// Minimum rollout idle time required before phase-1 extraction. + pub(super) const MIN_ROLLOUT_IDLE_HOURS: i64 = 12; + /// Lease duration (seconds) for phase-1 job ownership. + pub(super) const JOB_LEASE_SECONDS: i64 = 3_600; + /// Backoff delay (seconds) before retrying a failed stage-1 extraction job. + pub(super) const JOB_RETRY_DELAY_SECONDS: i64 = 3_600; +} + +/// Phase 2 (aka `Consolidation`). +mod phase_two { + /// Subagent source label used to identify consolidation tasks. + pub(super) const MEMORY_CONSOLIDATION_SUBAGENT_LABEL: &str = "memory_consolidation"; + /// Maximum number of recent raw memories retained for global consolidation. + pub(super) const MAX_RAW_MEMORIES_FOR_GLOBAL: usize = 1_024; + /// Lease duration (seconds) for phase-2 consolidation job ownership. + pub(super) const JOB_LEASE_SECONDS: i64 = 3_600; + /// Backoff delay (seconds) before retrying a failed phase-2 consolidation + /// job. + pub(super) const JOB_RETRY_DELAY_SECONDS: i64 = 3_600; + /// Heartbeat interval (seconds) for phase-2 running jobs. + pub(super) const JOB_HEARTBEAT_SECONDS: u64 = 30; +} pub fn memory_root(codex_home: &Path) -> PathBuf { codex_home.join("memories") } fn rollout_summaries_dir(root: &Path) -> PathBuf { - root.join(ROLLOUT_SUMMARIES_SUBDIR) + root.join(artifacts::ROLLOUT_SUMMARIES_SUBDIR) } fn raw_memories_file(root: &Path) -> PathBuf { - root.join(RAW_MEMORIES_FILENAME) + root.join(artifacts::RAW_MEMORIES_FILENAME) } async fn ensure_layout(root: &Path) -> std::io::Result<()> { diff --git a/codex-rs/core/src/memories/prompts.rs b/codex-rs/core/src/memories/prompts.rs index 3e8921404..33b1834c4 100644 --- a/codex-rs/core/src/memories/prompts.rs +++ b/codex-rs/core/src/memories/prompts.rs @@ -1,7 +1,5 @@ -use crate::memories::DEFAULT_STAGE_ONE_ROLLOUT_TOKEN_LIMIT; -use crate::memories::MEMORY_TOOL_DEVELOPER_INSTRUCTIONS_SUMMARY_TOKEN_LIMIT; -use crate::memories::STAGE_ONE_CONTEXT_WINDOW_PERCENT; use crate::memories::memory_root; +use crate::memories::phase_one; use crate::truncate::TruncationPolicy; use crate::truncate::truncate_text; use askama::Template; @@ -58,9 +56,9 @@ pub(super) fn build_stage_one_input_message( .context_window .and_then(|limit| (limit > 0).then_some(limit)) .map(|limit| limit.saturating_mul(model_info.effective_context_window_percent) / 100) - .map(|limit| (limit.saturating_mul(STAGE_ONE_CONTEXT_WINDOW_PERCENT) / 100).max(1)) + .map(|limit| (limit.saturating_mul(phase_one::CONTEXT_WINDOW_PERCENT) / 100).max(1)) .and_then(|limit| usize::try_from(limit).ok()) - .unwrap_or(DEFAULT_STAGE_ONE_ROLLOUT_TOKEN_LIMIT); + .unwrap_or(phase_one::DEFAULT_STAGE_ONE_ROLLOUT_TOKEN_LIMIT); let truncated_rollout_contents = truncate_text( rollout_contents, TruncationPolicy::Tokens(rollout_token_limit), @@ -86,7 +84,7 @@ pub(crate) async fn build_memory_tool_developer_instructions(codex_home: &Path) .to_string(); let memory_summary = truncate_text( &memory_summary, - TruncationPolicy::Tokens(MEMORY_TOOL_DEVELOPER_INSTRUCTIONS_SUMMARY_TOKEN_LIMIT), + TruncationPolicy::Tokens(phase_one::MEMORY_TOOL_DEVELOPER_INSTRUCTIONS_SUMMARY_TOKEN_LIMIT), ); if memory_summary.is_empty() { return None; @@ -111,7 +109,7 @@ mod tests { model_info.context_window = Some(123_000); let expected_rollout_token_limit = usize::try_from( ((123_000_i64 * model_info.effective_context_window_percent) / 100) - * STAGE_ONE_CONTEXT_WINDOW_PERCENT + * phase_one::CONTEXT_WINDOW_PERCENT / 100, ) .unwrap(); @@ -140,7 +138,7 @@ mod tests { model_info.context_window = None; let expected_truncated = truncate_text( &input, - TruncationPolicy::Tokens(DEFAULT_STAGE_ONE_ROLLOUT_TOKEN_LIMIT), + TruncationPolicy::Tokens(phase_one::DEFAULT_STAGE_ONE_ROLLOUT_TOKEN_LIMIT), ); let message = build_stage_one_input_message( &model_info, diff --git a/codex-rs/core/src/memories/startup/dispatch.rs b/codex-rs/core/src/memories/startup/dispatch.rs index aaf21fed8..9a25faa3e 100644 --- a/codex-rs/core/src/memories/startup/dispatch.rs +++ b/codex-rs/core/src/memories/startup/dispatch.rs @@ -2,6 +2,11 @@ use crate::codex::Session; use crate::config::Config; use crate::config::Constrained; use crate::memories::memory_root; +use crate::memories::phase_two; +use crate::memories::prompts::build_consolidation_prompt; +use crate::memories::startup::phase2::spawn_phase2_completion_task; +use crate::memories::storage::rebuild_raw_memories_file_from_memories; +use crate::memories::storage::sync_rollout_summaries_from_memories; use codex_protocol::protocol::AskForApproval; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; @@ -13,15 +18,6 @@ use tracing::debug; use tracing::info; use tracing::warn; -use super::super::MAX_RAW_MEMORIES_FOR_GLOBAL; -use super::super::MEMORY_CONSOLIDATION_SUBAGENT_LABEL; -use super::super::PHASE_TWO_JOB_LEASE_SECONDS; -use super::super::PHASE_TWO_JOB_RETRY_DELAY_SECONDS; -use super::super::prompts::build_consolidation_prompt; -use super::super::storage::rebuild_raw_memories_file_from_memories; -use super::super::storage::sync_rollout_summaries_from_memories; -use super::phase2::spawn_phase2_completion_task; - fn completion_watermark( claimed_watermark: i64, latest_memories: &[codex_state::Stage1Output], @@ -44,7 +40,7 @@ pub(super) async fn run_global_memory_consolidation( }; let claim = match state_db - .try_claim_global_phase2_job(session.conversation_id, PHASE_TWO_JOB_LEASE_SECONDS) + .try_claim_global_phase2_job(session.conversation_id, phase_two::JOB_LEASE_SECONDS) .await { Ok(claim) => claim, @@ -97,7 +93,7 @@ pub(super) async fn run_global_memory_consolidation( .mark_global_phase2_job_failed( &ownership_token, "consolidation sandbox policy was rejected by constraints", - PHASE_TWO_JOB_RETRY_DELAY_SECONDS, + phase_two::JOB_RETRY_DELAY_SECONDS, ) .await; return false; @@ -106,7 +102,7 @@ pub(super) async fn run_global_memory_consolidation( }; let latest_memories = match state_db - .list_stage1_outputs_for_global(MAX_RAW_MEMORIES_FOR_GLOBAL) + .list_stage1_outputs_for_global(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL) .await { Ok(memories) => memories, @@ -116,7 +112,7 @@ pub(super) async fn run_global_memory_consolidation( .mark_global_phase2_job_failed( &ownership_token, "failed to read stage-1 outputs before global consolidation", - PHASE_TWO_JOB_RETRY_DELAY_SECONDS, + phase_two::JOB_RETRY_DELAY_SECONDS, ) .await; return false; @@ -129,7 +125,7 @@ pub(super) async fn run_global_memory_consolidation( .mark_global_phase2_job_failed( &ownership_token, "failed syncing local memory artifacts", - PHASE_TWO_JOB_RETRY_DELAY_SECONDS, + phase_two::JOB_RETRY_DELAY_SECONDS, ) .await; return false; @@ -141,7 +137,7 @@ pub(super) async fn run_global_memory_consolidation( .mark_global_phase2_job_failed( &ownership_token, "failed rebuilding raw memories aggregate", - PHASE_TWO_JOB_RETRY_DELAY_SECONDS, + phase_two::JOB_RETRY_DELAY_SECONDS, ) .await; return false; @@ -160,7 +156,7 @@ pub(super) async fn run_global_memory_consolidation( text_elements: vec![], }]; let source = SessionSource::SubAgent(SubAgentSource::Other( - MEMORY_CONSOLIDATION_SUBAGENT_LABEL.to_string(), + phase_two::MEMORY_CONSOLIDATION_SUBAGENT_LABEL.to_string(), )); match session @@ -187,7 +183,7 @@ pub(super) async fn run_global_memory_consolidation( .mark_global_phase2_job_failed( &ownership_token, "failed to spawn consolidation agent", - PHASE_TWO_JOB_RETRY_DELAY_SECONDS, + phase_two::JOB_RETRY_DELAY_SECONDS, ) .await; false diff --git a/codex-rs/core/src/memories/startup/extract.rs b/codex-rs/core/src/memories/startup/extract.rs index 1b91239aa..1ccddd7ff 100644 --- a/codex-rs/core/src/memories/startup/extract.rs +++ b/codex-rs/core/src/memories/startup/extract.rs @@ -11,12 +11,12 @@ use codex_protocol::models::ResponseItem; use futures::StreamExt; use tracing::warn; -use super::StageOneRequestContext; use crate::memories::prompts::build_stage_one_input_message; use crate::memories::stage_one::RAW_MEMORY_PROMPT; use crate::memories::stage_one::StageOneOutput; use crate::memories::stage_one::parse_stage_one_output; use crate::memories::stage_one::stage_one_output_schema; +use crate::memories::startup::StageOneRequestContext; use crate::rollout::policy::should_persist_response_item_for_memories; use codex_protocol::protocol::RolloutItem; use std::path::Path; diff --git a/codex-rs/core/src/memories/startup/mod.rs b/codex-rs/core/src/memories/startup/mod.rs index 476853e0c..1422df7ac 100644 --- a/codex-rs/core/src/memories/startup/mod.rs +++ b/codex-rs/core/src/memories/startup/mod.rs @@ -7,6 +7,7 @@ use crate::codex::TurnContext; use crate::config::Config; use crate::error::Result as CodexResult; use crate::features::Feature; +use crate::memories::phase_one; use crate::rollout::INTERACTIVE_SESSION_SOURCES; use codex_otel::OtelManager; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; @@ -93,11 +94,11 @@ pub(super) async fn run_memories_startup_pipeline( session.conversation_id, codex_state::Stage1StartupClaimParams { scan_limit: PHASE_ONE_THREAD_SCAN_LIMIT, - max_claimed: super::MAX_ROLLOUTS_PER_STARTUP, - max_age_days: super::PHASE_ONE_MAX_ROLLOUT_AGE_DAYS, - min_rollout_idle_hours: super::PHASE_ONE_MIN_ROLLOUT_IDLE_HOURS, + max_claimed: phase_one::MAX_ROLLOUTS_PER_STARTUP, + max_age_days: phase_one::MAX_ROLLOUT_AGE_DAYS, + min_rollout_idle_hours: phase_one::MIN_ROLLOUT_IDLE_HOURS, allowed_sources: allowed_sources.as_slice(), - lease_seconds: super::PHASE_ONE_JOB_LEASE_SECONDS, + lease_seconds: phase_one::JOB_LEASE_SECONDS, }, ) .await @@ -140,7 +141,7 @@ pub(super) async fn run_memories_startup_pipeline( thread.id, &claim.ownership_token, reason, - super::PHASE_ONE_JOB_RETRY_DELAY_SECONDS, + phase_one::JOB_RETRY_DELAY_SECONDS, ) .await; } @@ -173,7 +174,7 @@ pub(super) async fn run_memories_startup_pipeline( .unwrap_or(false) } }) - .buffer_unordered(super::PHASE_ONE_CONCURRENCY_LIMIT) + .buffer_unordered(phase_one::CONCURRENCY_LIMIT) .collect::>() .await .into_iter() diff --git a/codex-rs/core/src/memories/startup/phase2.rs b/codex-rs/core/src/memories/startup/phase2.rs index 137add783..2e839384b 100644 --- a/codex-rs/core/src/memories/startup/phase2.rs +++ b/codex-rs/core/src/memories/startup/phase2.rs @@ -1,6 +1,7 @@ use crate::agent::AgentStatus; use crate::agent::status::is_final as is_final_agent_status; use crate::codex::Session; +use crate::memories::phase_two; use codex_protocol::ThreadId; use std::sync::Arc; use std::time::Duration; @@ -9,10 +10,6 @@ use tracing::debug; use tracing::info; use tracing::warn; -use super::super::PHASE_TWO_JOB_HEARTBEAT_SECONDS; -use super::super::PHASE_TWO_JOB_LEASE_SECONDS; -use super::super::PHASE_TWO_JOB_RETRY_DELAY_SECONDS; - pub(super) fn spawn_phase2_completion_task( session: &Session, ownership_token: String, @@ -74,7 +71,7 @@ async fn run_phase2_completion_task( ) -> AgentStatus { let final_status = { let mut heartbeat_interval = - tokio::time::interval(Duration::from_secs(PHASE_TWO_JOB_HEARTBEAT_SECONDS)); + tokio::time::interval(Duration::from_secs(phase_two::JOB_HEARTBEAT_SECONDS)); heartbeat_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { @@ -94,7 +91,10 @@ async fn run_phase2_completion_task( } _ = heartbeat_interval.tick() => { match state_db - .heartbeat_global_phase2_job(&ownership_token, PHASE_TWO_JOB_LEASE_SECONDS) + .heartbeat_global_phase2_job( + &ownership_token, + phase_two::JOB_LEASE_SECONDS, + ) .await { Ok(true) => {} @@ -162,7 +162,7 @@ async fn mark_phase2_failed_with_recovery( .mark_global_phase2_job_failed( ownership_token, failure_reason, - PHASE_TWO_JOB_RETRY_DELAY_SECONDS, + phase_two::JOB_RETRY_DELAY_SECONDS, ) .await { @@ -171,7 +171,7 @@ async fn mark_phase2_failed_with_recovery( .mark_global_phase2_job_failed_if_unowned( ownership_token, failure_reason, - PHASE_TWO_JOB_RETRY_DELAY_SECONDS, + phase_two::JOB_RETRY_DELAY_SECONDS, ) .await { diff --git a/codex-rs/core/src/memories/storage.rs b/codex-rs/core/src/memories/storage.rs index a366623b9..1b1704346 100644 --- a/codex-rs/core/src/memories/storage.rs +++ b/codex-rs/core/src/memories/storage.rs @@ -4,10 +4,10 @@ use std::fmt::Write as _; use std::path::Path; use tracing::warn; -use super::MAX_RAW_MEMORIES_FOR_GLOBAL; -use super::ensure_layout; -use super::raw_memories_file; -use super::rollout_summaries_dir; +use crate::memories::ensure_layout; +use crate::memories::phase_two; +use crate::memories::raw_memories_file; +use crate::memories::rollout_summaries_dir; /// Rebuild `raw_memories.md` from DB-backed stage-1 outputs. pub(super) async fn rebuild_raw_memories_file_from_memories( @@ -27,7 +27,7 @@ pub(super) async fn sync_rollout_summaries_from_memories( let retained = memories .iter() - .take(MAX_RAW_MEMORIES_FOR_GLOBAL) + .take(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL) .collect::>(); let keep = retained .iter() @@ -63,7 +63,7 @@ pub(super) async fn sync_rollout_summaries_from_memories( async fn rebuild_raw_memories_file(root: &Path, memories: &[Stage1Output]) -> std::io::Result<()> { let retained = memories .iter() - .take(MAX_RAW_MEMORIES_FOR_GLOBAL) + .take(phase_two::MAX_RAW_MEMORIES_FOR_GLOBAL) .collect::>(); let mut body = String::from("# Raw Memories\n\n");