From b519267d05459095eccb9767c28550a2952d8a2f Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Fri, 21 Nov 2025 19:06:45 -0800 Subject: [PATCH] Account for encrypted reasoning for auto compaction (#7113) - The total token used returned from the api doesn't account for the reasoning items before the assistant message - Account for those for auto compaction - Add the encrypted reasoning effort in the common tests utils - Add a test to make sure it works as expected --- codex-rs/Cargo.lock | 1 + codex-rs/core/src/codex.rs | 35 ++--- codex-rs/core/src/context_manager/history.rs | 49 +++++++ .../core/src/context_manager/history_tests.rs | 33 +++++ codex-rs/core/src/state/session.rs | 4 + codex-rs/core/src/truncate.rs | 2 +- codex-rs/core/tests/common/Cargo.toml | 1 + codex-rs/core/tests/common/responses.rs | 7 + codex-rs/core/tests/suite/compact.rs | 134 +++++++++++++++++- 9 files changed, 236 insertions(+), 30 deletions(-) diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index ef5d6b272..0bcfd4f37 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1777,6 +1777,7 @@ version = "0.0.0" dependencies = [ "anyhow", "assert_cmd", + "base64", "codex-core", "codex-protocol", "notify", diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index a91024348..96402961b 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -661,6 +661,11 @@ impl Session { format!("auto-compact-{id}") } + async fn get_total_token_usage(&self) -> i64 { + let state = self.state.lock().await; + state.get_total_token_usage() + } + async fn record_initial_history(&self, conversation_history: InitialHistory) { let turn_context = self.new_turn(SessionSettingsUpdate::default()).await; match conversation_history { @@ -1958,20 +1963,13 @@ pub(crate) async fn run_task( .await { Ok(turn_output) => { - let TurnRunResult { - processed_items, - total_token_usage, - } = turn_output; + let processed_items = turn_output; let limit = turn_context .client .get_auto_compact_token_limit() .unwrap_or(i64::MAX); - let total_usage_tokens = total_token_usage - .as_ref() - .map(TokenUsage::tokens_in_context_window); - let token_limit_reached = total_usage_tokens - .map(|tokens| tokens >= limit) - .unwrap_or(false); + let total_usage_tokens = sess.get_total_token_usage().await; + let token_limit_reached = total_usage_tokens >= limit; let (responses, items_to_record_in_conversation_history) = process_items(processed_items, &sess, &turn_context).await; @@ -2028,7 +2026,7 @@ async fn run_turn( turn_diff_tracker: SharedTurnDiffTracker, input: Vec, cancellation_token: CancellationToken, -) -> CodexResult { +) -> CodexResult> { let mcp_tools = sess .services .mcp_connection_manager @@ -2159,12 +2157,6 @@ pub struct ProcessedResponseItem { pub response: Option, } -#[derive(Debug)] -struct TurnRunResult { - processed_items: Vec, - total_token_usage: Option, -} - #[allow(clippy::too_many_arguments)] async fn try_run_turn( router: Arc, @@ -2173,7 +2165,7 @@ async fn try_run_turn( turn_diff_tracker: SharedTurnDiffTracker, prompt: &Prompt, cancellation_token: CancellationToken, -) -> CodexResult { +) -> CodexResult> { let rollout_item = RolloutItem::TurnContext(TurnContextItem { cwd: turn_context.cwd.clone(), approval_policy: turn_context.approval_policy, @@ -2335,12 +2327,7 @@ async fn try_run_turn( sess.send_event(&turn_context, msg).await; } - let result = TurnRunResult { - processed_items, - total_token_usage: token_usage.clone(), - }; - - return Ok(result); + return Ok(processed_items); } ResponseEvent::OutputTextDelta(delta) => { // In review child threads, suppress assistant text deltas; the diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index 8eefcbf85..035d0d286 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -2,6 +2,7 @@ use crate::codex::TurnContext; use crate::context_manager::normalize; use crate::truncate::TruncationPolicy; use crate::truncate::approx_token_count; +use crate::truncate::approx_tokens_from_byte_count; use crate::truncate::truncate_function_output_items_with_policy; use crate::truncate::truncate_text; use codex_protocol::models::FunctionCallOutputPayload; @@ -119,6 +120,54 @@ impl ContextManager { ); } + fn get_non_last_reasoning_items_tokens(&self) -> usize { + // get reasoning items excluding all the ones after the last user message + let Some(last_user_index) = self + .items + .iter() + .rposition(|item| matches!(item, ResponseItem::Message { role, .. } if role == "user")) + else { + return 0usize; + }; + + let total_reasoning_bytes = self + .items + .iter() + .take(last_user_index) + .filter_map(|item| { + if let ResponseItem::Reasoning { + encrypted_content: Some(content), + .. + } = item + { + Some(content.len()) + } else { + None + } + }) + .map(Self::estimate_reasoning_length) + .fold(0usize, usize::saturating_add); + + let token_estimate = approx_tokens_from_byte_count(total_reasoning_bytes); + token_estimate as usize + } + + fn estimate_reasoning_length(encoded_len: usize) -> usize { + encoded_len + .saturating_mul(3) + .checked_div(4) + .unwrap_or(0) + .saturating_sub(650) + } + + pub(crate) fn get_total_token_usage(&self) -> i64 { + self.token_info + .as_ref() + .map(|info| info.last_token_usage.total_tokens) + .unwrap_or(0) + .saturating_add(self.get_non_last_reasoning_items_tokens() as i64) + } + /// This function enforces a couple of invariants on the in-memory history: /// 1. every call (function/custom) has a corresponding output entry /// 2. every output has a corresponding call entry diff --git a/codex-rs/core/src/context_manager/history_tests.rs b/codex-rs/core/src/context_manager/history_tests.rs index 1a01604a7..cb59c9723 100644 --- a/codex-rs/core/src/context_manager/history_tests.rs +++ b/codex-rs/core/src/context_manager/history_tests.rs @@ -56,6 +56,17 @@ fn reasoning_msg(text: &str) -> ResponseItem { } } +fn reasoning_with_encrypted_content(len: usize) -> ResponseItem { + ResponseItem::Reasoning { + id: String::new(), + summary: vec![ReasoningItemReasoningSummary::SummaryText { + text: "summary".to_string(), + }], + content: None, + encrypted_content: Some("a".repeat(len)), + } +} + fn truncate_exec_output(content: &str) -> String { truncate::truncate_text(content, TruncationPolicy::Tokens(EXEC_FORMAT_MAX_TOKENS)) } @@ -112,6 +123,28 @@ fn filters_non_api_messages() { ); } +#[test] +fn non_last_reasoning_tokens_return_zero_when_no_user_messages() { + let history = create_history_with_items(vec![reasoning_with_encrypted_content(800)]); + + assert_eq!(history.get_non_last_reasoning_items_tokens(), 0); +} + +#[test] +fn non_last_reasoning_tokens_ignore_entries_after_last_user() { + let history = create_history_with_items(vec![ + reasoning_with_encrypted_content(900), + user_msg("first"), + reasoning_with_encrypted_content(1_000), + user_msg("second"), + reasoning_with_encrypted_content(2_000), + ]); + // first: (900 * 0.75 - 650) / 4 = 6.25 tokens + // second: (1000 * 0.75 - 650) / 4 = 25 tokens + // first + second = 62.5 + assert_eq!(history.get_non_last_reasoning_items_tokens(), 32); +} + #[test] fn get_history_for_prompt_drops_ghost_commits() { let items = vec![ResponseItem::GhostSnapshot { diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index 2dfa5199f..caebac6b8 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -74,4 +74,8 @@ impl SessionState { pub(crate) fn set_token_usage_full(&mut self, context_window: i64) { self.history.set_token_usage_full(context_window); } + + pub(crate) fn get_total_token_usage(&self) -> i64 { + self.history.get_total_token_usage() + } } diff --git a/codex-rs/core/src/truncate.rs b/codex-rs/core/src/truncate.rs index bf883c061..6e38ef698 100644 --- a/codex-rs/core/src/truncate.rs +++ b/codex-rs/core/src/truncate.rs @@ -296,7 +296,7 @@ fn approx_bytes_for_tokens(tokens: usize) -> usize { tokens.saturating_mul(APPROX_BYTES_PER_TOKEN) } -fn approx_tokens_from_byte_count(bytes: usize) -> u64 { +pub(crate) fn approx_tokens_from_byte_count(bytes: usize) -> u64 { let bytes_u64 = bytes as u64; bytes_u64.saturating_add((APPROX_BYTES_PER_TOKEN as u64).saturating_sub(1)) / (APPROX_BYTES_PER_TOKEN as u64) diff --git a/codex-rs/core/tests/common/Cargo.toml b/codex-rs/core/tests/common/Cargo.toml index 094f33a26..4c47fbb52 100644 --- a/codex-rs/core/tests/common/Cargo.toml +++ b/codex-rs/core/tests/common/Cargo.toml @@ -9,6 +9,7 @@ path = "lib.rs" [dependencies] anyhow = { workspace = true } assert_cmd = { workspace = true } +base64 = { workspace = true } codex-core = { workspace = true } codex-protocol = { workspace = true } notify = { workspace = true } diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index b84e96639..7cc59f26e 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::sync::Mutex; use anyhow::Result; +use base64::Engine; use serde_json::Value; use wiremock::BodyPrintLimit; use wiremock::Match; @@ -297,12 +298,18 @@ pub fn ev_reasoning_item(id: &str, summary: &[&str], raw_content: &[&str]) -> Va .map(|text| serde_json::json!({"type": "summary_text", "text": text})) .collect(); + let overhead = "b".repeat(550); + let raw_content_joined = raw_content.join(""); + let encrypted_content = + base64::engine::general_purpose::STANDARD.encode(overhead + raw_content_joined.as_str()); + let mut event = serde_json::json!({ "type": "response.output_item.done", "item": { "type": "reasoning", "id": id, "summary": summary_entries, + "encrypted_content": encrypted_content, } }); diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index 1324d3edb..aa74ec897 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -7,6 +7,7 @@ use codex_core::built_in_model_providers; use codex_core::compact::SUMMARIZATION_PROMPT; use codex_core::compact::SUMMARY_PREFIX; use codex_core::config::Config; +use codex_core::features::Feature; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; use codex_core::protocol::RolloutItem; @@ -27,6 +28,7 @@ use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; use core_test_support::responses::ev_completed_with_tokens; use core_test_support::responses::ev_function_call; +use core_test_support::responses::mount_compact_json_once; use core_test_support::responses::mount_sse_once; use core_test_support::responses::mount_sse_once_match; use core_test_support::responses::mount_sse_sequence; @@ -481,9 +483,14 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { // mock responses from the model + let reasoning_response_1 = ev_reasoning_item("m1", &["I will create a react app"], &[]); + let encrypted_content_1 = reasoning_response_1["item"]["encrypted_content"] + .as_str() + .unwrap(); + // first chunk of work let model_reasoning_response_1_sse = sse(vec![ - ev_reasoning_item("m1", &["I will create a react app"], &[]), + reasoning_response_1.clone(), ev_local_shell_call("r1-shell", "completed", vec!["echo", "make-react"]), ev_completed_with_tokens("r1", token_count_used), ]); @@ -494,9 +501,14 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { ev_completed_with_tokens("r2", token_count_used_after_compaction), ]); + let reasoning_response_2 = ev_reasoning_item("m3", &["I will create a node app"], &[]); + let encrypted_content_2 = reasoning_response_2["item"]["encrypted_content"] + .as_str() + .unwrap(); + // second chunk of work let model_reasoning_response_2_sse = sse(vec![ - ev_reasoning_item("m3", &["I will create a node app"], &[]), + reasoning_response_2.clone(), ev_local_shell_call("r3-shell", "completed", vec!["echo", "make-node"]), ev_completed_with_tokens("r3", token_count_used), ]); @@ -507,6 +519,11 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { ev_completed_with_tokens("r4", token_count_used_after_compaction), ]); + let reasoning_response_3 = ev_reasoning_item("m6", &["I will create a python app"], &[]); + let encrypted_content_3 = reasoning_response_3["item"]["encrypted_content"] + .as_str() + .unwrap(); + // third chunk of work let model_reasoning_response_3_sse = sse(vec![ ev_reasoning_item("m6", &["I will create a python app"], &[]), @@ -635,7 +652,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { }, { "content": null, - "encrypted_content": null, + "encrypted_content": encrypted_content_1, "summary": [ { "text": "I will create a react app", @@ -745,7 +762,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { }, { "content": null, - "encrypted_content": null, + "encrypted_content": encrypted_content_2, "summary": [ { "text": "I will create a node app", @@ -855,7 +872,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() { }, { "content": null, - "encrypted_content": null, + "encrypted_content": encrypted_content_3, "summary": [ { "text": "I will create a python app", @@ -1879,3 +1896,110 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() { "auto compact request should include the summarization prompt after exceeding 95% (limit {limit})" ); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn auto_compact_counts_encrypted_reasoning_before_last_user() { + skip_if_no_network!(); + + let server = start_mock_server().await; + + let first_user = "COUNT_PRE_LAST_REASONING"; + let second_user = "TRIGGER_COMPACT_AT_LIMIT"; + + let pre_last_reasoning_content = "a".repeat(2_400); + let post_last_reasoning_content = "b".repeat(4_000); + + let first_turn = sse(vec![ + ev_reasoning_item("pre-reasoning", &["pre"], &[&pre_last_reasoning_content]), + ev_completed_with_tokens("r1", 10), + ]); + let second_turn = sse(vec![ + ev_reasoning_item("post-reasoning", &["post"], &[&post_last_reasoning_content]), + ev_completed_with_tokens("r2", 80), + ]); + let resume_turn = sse(vec![ + ev_assistant_message("m4", FINAL_REPLY), + ev_completed_with_tokens("r4", 1), + ]); + + let request_log = mount_sse_sequence( + &server, + vec![ + // Turn 1: reasoning before last user (should count). + first_turn, + // Turn 2: reasoning after last user (should be ignored for compaction). + second_turn, + // Turn 3: resume after remote compaction. + resume_turn, + ], + ) + .await; + + let compacted_history = vec![codex_protocol::models::ResponseItem::Message { + id: None, + role: "assistant".to_string(), + content: vec![codex_protocol::models::ContentItem::OutputText { + text: "REMOTE_COMPACT_SUMMARY".to_string(), + }], + }]; + let compact_mock = + mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await; + + let codex = test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + set_test_compact_prompt(config); + config.model_auto_compact_token_limit = Some(300); + config.features.enable(Feature::RemoteCompaction); + }) + .build(&server) + .await + .expect("build codex") + .codex; + + for (idx, user) in [first_user, second_user].into_iter().enumerate() { + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { text: user.into() }], + }) + .await + .unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + if idx == 0 { + assert!( + compact_mock.requests().is_empty(), + "remote compaction should not run after the first turn" + ); + } + } + + let compact_requests = compact_mock.requests(); + assert_eq!( + compact_requests.len(), + 1, + "remote compaction should run once after the second turn" + ); + assert_eq!( + compact_requests[0].path(), + "/v1/responses/compact", + "remote compaction should hit the compact endpoint" + ); + + let requests = request_log.requests(); + assert_eq!( + requests.len(), + 3, + "conversation should include two user turns and a post-compaction resume" + ); + let second_request_body = requests[1].body_json().to_string(); + assert!( + !second_request_body.contains("REMOTE_COMPACT_SUMMARY"), + "second turn should not include compacted history" + ); + let resume_body = requests[2].body_json().to_string(); + assert!( + resume_body.contains("REMOTE_COMPACT_SUMMARY") || resume_body.contains(FINAL_REPLY), + "resume request should follow remote compact and use compacted history" + ); +}