Account for encrypted reasoning for auto compaction (#7113)
- The total token used returned from the api doesn't account for the reasoning items before the assistant message - Account for those for auto compaction - Add the encrypted reasoning effort in the common tests utils - Add a test to make sure it works as expected
This commit is contained in:
parent
529eb4ff2a
commit
b519267d05
9 changed files with 236 additions and 30 deletions
1
codex-rs/Cargo.lock
generated
1
codex-rs/Cargo.lock
generated
|
|
@ -1777,6 +1777,7 @@ version = "0.0.0"
|
|||
dependencies = [
|
||||
"anyhow",
|
||||
"assert_cmd",
|
||||
"base64",
|
||||
"codex-core",
|
||||
"codex-protocol",
|
||||
"notify",
|
||||
|
|
|
|||
|
|
@ -661,6 +661,11 @@ impl Session {
|
|||
format!("auto-compact-{id}")
|
||||
}
|
||||
|
||||
async fn get_total_token_usage(&self) -> i64 {
|
||||
let state = self.state.lock().await;
|
||||
state.get_total_token_usage()
|
||||
}
|
||||
|
||||
async fn record_initial_history(&self, conversation_history: InitialHistory) {
|
||||
let turn_context = self.new_turn(SessionSettingsUpdate::default()).await;
|
||||
match conversation_history {
|
||||
|
|
@ -1958,20 +1963,13 @@ pub(crate) async fn run_task(
|
|||
.await
|
||||
{
|
||||
Ok(turn_output) => {
|
||||
let TurnRunResult {
|
||||
processed_items,
|
||||
total_token_usage,
|
||||
} = turn_output;
|
||||
let processed_items = turn_output;
|
||||
let limit = turn_context
|
||||
.client
|
||||
.get_auto_compact_token_limit()
|
||||
.unwrap_or(i64::MAX);
|
||||
let total_usage_tokens = total_token_usage
|
||||
.as_ref()
|
||||
.map(TokenUsage::tokens_in_context_window);
|
||||
let token_limit_reached = total_usage_tokens
|
||||
.map(|tokens| tokens >= limit)
|
||||
.unwrap_or(false);
|
||||
let total_usage_tokens = sess.get_total_token_usage().await;
|
||||
let token_limit_reached = total_usage_tokens >= limit;
|
||||
let (responses, items_to_record_in_conversation_history) =
|
||||
process_items(processed_items, &sess, &turn_context).await;
|
||||
|
||||
|
|
@ -2028,7 +2026,7 @@ async fn run_turn(
|
|||
turn_diff_tracker: SharedTurnDiffTracker,
|
||||
input: Vec<ResponseItem>,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> CodexResult<TurnRunResult> {
|
||||
) -> CodexResult<Vec<ProcessedResponseItem>> {
|
||||
let mcp_tools = sess
|
||||
.services
|
||||
.mcp_connection_manager
|
||||
|
|
@ -2159,12 +2157,6 @@ pub struct ProcessedResponseItem {
|
|||
pub response: Option<ResponseInputItem>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TurnRunResult {
|
||||
processed_items: Vec<ProcessedResponseItem>,
|
||||
total_token_usage: Option<TokenUsage>,
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn try_run_turn(
|
||||
router: Arc<ToolRouter>,
|
||||
|
|
@ -2173,7 +2165,7 @@ async fn try_run_turn(
|
|||
turn_diff_tracker: SharedTurnDiffTracker,
|
||||
prompt: &Prompt,
|
||||
cancellation_token: CancellationToken,
|
||||
) -> CodexResult<TurnRunResult> {
|
||||
) -> CodexResult<Vec<ProcessedResponseItem>> {
|
||||
let rollout_item = RolloutItem::TurnContext(TurnContextItem {
|
||||
cwd: turn_context.cwd.clone(),
|
||||
approval_policy: turn_context.approval_policy,
|
||||
|
|
@ -2335,12 +2327,7 @@ async fn try_run_turn(
|
|||
sess.send_event(&turn_context, msg).await;
|
||||
}
|
||||
|
||||
let result = TurnRunResult {
|
||||
processed_items,
|
||||
total_token_usage: token_usage.clone(),
|
||||
};
|
||||
|
||||
return Ok(result);
|
||||
return Ok(processed_items);
|
||||
}
|
||||
ResponseEvent::OutputTextDelta(delta) => {
|
||||
// In review child threads, suppress assistant text deltas; the
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use crate::codex::TurnContext;
|
|||
use crate::context_manager::normalize;
|
||||
use crate::truncate::TruncationPolicy;
|
||||
use crate::truncate::approx_token_count;
|
||||
use crate::truncate::approx_tokens_from_byte_count;
|
||||
use crate::truncate::truncate_function_output_items_with_policy;
|
||||
use crate::truncate::truncate_text;
|
||||
use codex_protocol::models::FunctionCallOutputPayload;
|
||||
|
|
@ -119,6 +120,54 @@ impl ContextManager {
|
|||
);
|
||||
}
|
||||
|
||||
fn get_non_last_reasoning_items_tokens(&self) -> usize {
|
||||
// get reasoning items excluding all the ones after the last user message
|
||||
let Some(last_user_index) = self
|
||||
.items
|
||||
.iter()
|
||||
.rposition(|item| matches!(item, ResponseItem::Message { role, .. } if role == "user"))
|
||||
else {
|
||||
return 0usize;
|
||||
};
|
||||
|
||||
let total_reasoning_bytes = self
|
||||
.items
|
||||
.iter()
|
||||
.take(last_user_index)
|
||||
.filter_map(|item| {
|
||||
if let ResponseItem::Reasoning {
|
||||
encrypted_content: Some(content),
|
||||
..
|
||||
} = item
|
||||
{
|
||||
Some(content.len())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.map(Self::estimate_reasoning_length)
|
||||
.fold(0usize, usize::saturating_add);
|
||||
|
||||
let token_estimate = approx_tokens_from_byte_count(total_reasoning_bytes);
|
||||
token_estimate as usize
|
||||
}
|
||||
|
||||
fn estimate_reasoning_length(encoded_len: usize) -> usize {
|
||||
encoded_len
|
||||
.saturating_mul(3)
|
||||
.checked_div(4)
|
||||
.unwrap_or(0)
|
||||
.saturating_sub(650)
|
||||
}
|
||||
|
||||
pub(crate) fn get_total_token_usage(&self) -> i64 {
|
||||
self.token_info
|
||||
.as_ref()
|
||||
.map(|info| info.last_token_usage.total_tokens)
|
||||
.unwrap_or(0)
|
||||
.saturating_add(self.get_non_last_reasoning_items_tokens() as i64)
|
||||
}
|
||||
|
||||
/// This function enforces a couple of invariants on the in-memory history:
|
||||
/// 1. every call (function/custom) has a corresponding output entry
|
||||
/// 2. every output has a corresponding call entry
|
||||
|
|
|
|||
|
|
@ -56,6 +56,17 @@ fn reasoning_msg(text: &str) -> ResponseItem {
|
|||
}
|
||||
}
|
||||
|
||||
fn reasoning_with_encrypted_content(len: usize) -> ResponseItem {
|
||||
ResponseItem::Reasoning {
|
||||
id: String::new(),
|
||||
summary: vec![ReasoningItemReasoningSummary::SummaryText {
|
||||
text: "summary".to_string(),
|
||||
}],
|
||||
content: None,
|
||||
encrypted_content: Some("a".repeat(len)),
|
||||
}
|
||||
}
|
||||
|
||||
fn truncate_exec_output(content: &str) -> String {
|
||||
truncate::truncate_text(content, TruncationPolicy::Tokens(EXEC_FORMAT_MAX_TOKENS))
|
||||
}
|
||||
|
|
@ -112,6 +123,28 @@ fn filters_non_api_messages() {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_last_reasoning_tokens_return_zero_when_no_user_messages() {
|
||||
let history = create_history_with_items(vec![reasoning_with_encrypted_content(800)]);
|
||||
|
||||
assert_eq!(history.get_non_last_reasoning_items_tokens(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn non_last_reasoning_tokens_ignore_entries_after_last_user() {
|
||||
let history = create_history_with_items(vec![
|
||||
reasoning_with_encrypted_content(900),
|
||||
user_msg("first"),
|
||||
reasoning_with_encrypted_content(1_000),
|
||||
user_msg("second"),
|
||||
reasoning_with_encrypted_content(2_000),
|
||||
]);
|
||||
// first: (900 * 0.75 - 650) / 4 = 6.25 tokens
|
||||
// second: (1000 * 0.75 - 650) / 4 = 25 tokens
|
||||
// first + second = 62.5
|
||||
assert_eq!(history.get_non_last_reasoning_items_tokens(), 32);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_history_for_prompt_drops_ghost_commits() {
|
||||
let items = vec![ResponseItem::GhostSnapshot {
|
||||
|
|
|
|||
|
|
@ -74,4 +74,8 @@ impl SessionState {
|
|||
pub(crate) fn set_token_usage_full(&mut self, context_window: i64) {
|
||||
self.history.set_token_usage_full(context_window);
|
||||
}
|
||||
|
||||
pub(crate) fn get_total_token_usage(&self) -> i64 {
|
||||
self.history.get_total_token_usage()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -296,7 +296,7 @@ fn approx_bytes_for_tokens(tokens: usize) -> usize {
|
|||
tokens.saturating_mul(APPROX_BYTES_PER_TOKEN)
|
||||
}
|
||||
|
||||
fn approx_tokens_from_byte_count(bytes: usize) -> u64 {
|
||||
pub(crate) fn approx_tokens_from_byte_count(bytes: usize) -> u64 {
|
||||
let bytes_u64 = bytes as u64;
|
||||
bytes_u64.saturating_add((APPROX_BYTES_PER_TOKEN as u64).saturating_sub(1))
|
||||
/ (APPROX_BYTES_PER_TOKEN as u64)
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ path = "lib.rs"
|
|||
[dependencies]
|
||||
anyhow = { workspace = true }
|
||||
assert_cmd = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
codex-core = { workspace = true }
|
||||
codex-protocol = { workspace = true }
|
||||
notify = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ use std::sync::Arc;
|
|||
use std::sync::Mutex;
|
||||
|
||||
use anyhow::Result;
|
||||
use base64::Engine;
|
||||
use serde_json::Value;
|
||||
use wiremock::BodyPrintLimit;
|
||||
use wiremock::Match;
|
||||
|
|
@ -297,12 +298,18 @@ pub fn ev_reasoning_item(id: &str, summary: &[&str], raw_content: &[&str]) -> Va
|
|||
.map(|text| serde_json::json!({"type": "summary_text", "text": text}))
|
||||
.collect();
|
||||
|
||||
let overhead = "b".repeat(550);
|
||||
let raw_content_joined = raw_content.join("");
|
||||
let encrypted_content =
|
||||
base64::engine::general_purpose::STANDARD.encode(overhead + raw_content_joined.as_str());
|
||||
|
||||
let mut event = serde_json::json!({
|
||||
"type": "response.output_item.done",
|
||||
"item": {
|
||||
"type": "reasoning",
|
||||
"id": id,
|
||||
"summary": summary_entries,
|
||||
"encrypted_content": encrypted_content,
|
||||
}
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ use codex_core::built_in_model_providers;
|
|||
use codex_core::compact::SUMMARIZATION_PROMPT;
|
||||
use codex_core::compact::SUMMARY_PREFIX;
|
||||
use codex_core::config::Config;
|
||||
use codex_core::features::Feature;
|
||||
use codex_core::protocol::EventMsg;
|
||||
use codex_core::protocol::Op;
|
||||
use codex_core::protocol::RolloutItem;
|
||||
|
|
@ -27,6 +28,7 @@ use core_test_support::responses::ev_assistant_message;
|
|||
use core_test_support::responses::ev_completed;
|
||||
use core_test_support::responses::ev_completed_with_tokens;
|
||||
use core_test_support::responses::ev_function_call;
|
||||
use core_test_support::responses::mount_compact_json_once;
|
||||
use core_test_support::responses::mount_sse_once;
|
||||
use core_test_support::responses::mount_sse_once_match;
|
||||
use core_test_support::responses::mount_sse_sequence;
|
||||
|
|
@ -481,9 +483,14 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() {
|
|||
|
||||
// mock responses from the model
|
||||
|
||||
let reasoning_response_1 = ev_reasoning_item("m1", &["I will create a react app"], &[]);
|
||||
let encrypted_content_1 = reasoning_response_1["item"]["encrypted_content"]
|
||||
.as_str()
|
||||
.unwrap();
|
||||
|
||||
// first chunk of work
|
||||
let model_reasoning_response_1_sse = sse(vec![
|
||||
ev_reasoning_item("m1", &["I will create a react app"], &[]),
|
||||
reasoning_response_1.clone(),
|
||||
ev_local_shell_call("r1-shell", "completed", vec!["echo", "make-react"]),
|
||||
ev_completed_with_tokens("r1", token_count_used),
|
||||
]);
|
||||
|
|
@ -494,9 +501,14 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() {
|
|||
ev_completed_with_tokens("r2", token_count_used_after_compaction),
|
||||
]);
|
||||
|
||||
let reasoning_response_2 = ev_reasoning_item("m3", &["I will create a node app"], &[]);
|
||||
let encrypted_content_2 = reasoning_response_2["item"]["encrypted_content"]
|
||||
.as_str()
|
||||
.unwrap();
|
||||
|
||||
// second chunk of work
|
||||
let model_reasoning_response_2_sse = sse(vec![
|
||||
ev_reasoning_item("m3", &["I will create a node app"], &[]),
|
||||
reasoning_response_2.clone(),
|
||||
ev_local_shell_call("r3-shell", "completed", vec!["echo", "make-node"]),
|
||||
ev_completed_with_tokens("r3", token_count_used),
|
||||
]);
|
||||
|
|
@ -507,6 +519,11 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() {
|
|||
ev_completed_with_tokens("r4", token_count_used_after_compaction),
|
||||
]);
|
||||
|
||||
let reasoning_response_3 = ev_reasoning_item("m6", &["I will create a python app"], &[]);
|
||||
let encrypted_content_3 = reasoning_response_3["item"]["encrypted_content"]
|
||||
.as_str()
|
||||
.unwrap();
|
||||
|
||||
// third chunk of work
|
||||
let model_reasoning_response_3_sse = sse(vec![
|
||||
ev_reasoning_item("m6", &["I will create a python app"], &[]),
|
||||
|
|
@ -635,7 +652,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() {
|
|||
},
|
||||
{
|
||||
"content": null,
|
||||
"encrypted_content": null,
|
||||
"encrypted_content": encrypted_content_1,
|
||||
"summary": [
|
||||
{
|
||||
"text": "I will create a react app",
|
||||
|
|
@ -745,7 +762,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() {
|
|||
},
|
||||
{
|
||||
"content": null,
|
||||
"encrypted_content": null,
|
||||
"encrypted_content": encrypted_content_2,
|
||||
"summary": [
|
||||
{
|
||||
"text": "I will create a node app",
|
||||
|
|
@ -855,7 +872,7 @@ async fn multiple_auto_compact_per_task_runs_after_token_limit_hit() {
|
|||
},
|
||||
{
|
||||
"content": null,
|
||||
"encrypted_content": null,
|
||||
"encrypted_content": encrypted_content_3,
|
||||
"summary": [
|
||||
{
|
||||
"text": "I will create a python app",
|
||||
|
|
@ -1879,3 +1896,110 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
|
|||
"auto compact request should include the summarization prompt after exceeding 95% (limit {limit})"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
|
||||
skip_if_no_network!();
|
||||
|
||||
let server = start_mock_server().await;
|
||||
|
||||
let first_user = "COUNT_PRE_LAST_REASONING";
|
||||
let second_user = "TRIGGER_COMPACT_AT_LIMIT";
|
||||
|
||||
let pre_last_reasoning_content = "a".repeat(2_400);
|
||||
let post_last_reasoning_content = "b".repeat(4_000);
|
||||
|
||||
let first_turn = sse(vec![
|
||||
ev_reasoning_item("pre-reasoning", &["pre"], &[&pre_last_reasoning_content]),
|
||||
ev_completed_with_tokens("r1", 10),
|
||||
]);
|
||||
let second_turn = sse(vec![
|
||||
ev_reasoning_item("post-reasoning", &["post"], &[&post_last_reasoning_content]),
|
||||
ev_completed_with_tokens("r2", 80),
|
||||
]);
|
||||
let resume_turn = sse(vec![
|
||||
ev_assistant_message("m4", FINAL_REPLY),
|
||||
ev_completed_with_tokens("r4", 1),
|
||||
]);
|
||||
|
||||
let request_log = mount_sse_sequence(
|
||||
&server,
|
||||
vec![
|
||||
// Turn 1: reasoning before last user (should count).
|
||||
first_turn,
|
||||
// Turn 2: reasoning after last user (should be ignored for compaction).
|
||||
second_turn,
|
||||
// Turn 3: resume after remote compaction.
|
||||
resume_turn,
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
||||
let compacted_history = vec![codex_protocol::models::ResponseItem::Message {
|
||||
id: None,
|
||||
role: "assistant".to_string(),
|
||||
content: vec![codex_protocol::models::ContentItem::OutputText {
|
||||
text: "REMOTE_COMPACT_SUMMARY".to_string(),
|
||||
}],
|
||||
}];
|
||||
let compact_mock =
|
||||
mount_compact_json_once(&server, serde_json::json!({ "output": compacted_history })).await;
|
||||
|
||||
let codex = test_codex()
|
||||
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
|
||||
.with_config(|config| {
|
||||
set_test_compact_prompt(config);
|
||||
config.model_auto_compact_token_limit = Some(300);
|
||||
config.features.enable(Feature::RemoteCompaction);
|
||||
})
|
||||
.build(&server)
|
||||
.await
|
||||
.expect("build codex")
|
||||
.codex;
|
||||
|
||||
for (idx, user) in [first_user, second_user].into_iter().enumerate() {
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text { text: user.into() }],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
||||
|
||||
if idx == 0 {
|
||||
assert!(
|
||||
compact_mock.requests().is_empty(),
|
||||
"remote compaction should not run after the first turn"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let compact_requests = compact_mock.requests();
|
||||
assert_eq!(
|
||||
compact_requests.len(),
|
||||
1,
|
||||
"remote compaction should run once after the second turn"
|
||||
);
|
||||
assert_eq!(
|
||||
compact_requests[0].path(),
|
||||
"/v1/responses/compact",
|
||||
"remote compaction should hit the compact endpoint"
|
||||
);
|
||||
|
||||
let requests = request_log.requests();
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
3,
|
||||
"conversation should include two user turns and a post-compaction resume"
|
||||
);
|
||||
let second_request_body = requests[1].body_json().to_string();
|
||||
assert!(
|
||||
!second_request_body.contains("REMOTE_COMPACT_SUMMARY"),
|
||||
"second turn should not include compacted history"
|
||||
);
|
||||
let resume_body = requests[2].body_json().to_string();
|
||||
assert!(
|
||||
resume_body.contains("REMOTE_COMPACT_SUMMARY") || resume_body.contains(FINAL_REPLY),
|
||||
"resume request should follow remote compact and use compacted history"
|
||||
);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue