From c9f5b9a6dfa635af2a2cd93bdd6da01ce540cfcd Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 16 Dec 2025 16:36:33 +0100 Subject: [PATCH] feat: do not compact on last user turn (#8060) --- codex-rs/core/src/codex.rs | 37 +++--- codex-rs/core/tests/suite/compact.rs | 161 +++++++++++++-------------- 2 files changed, 100 insertions(+), 98 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 5f0e322da..9d586b1f3 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -2150,6 +2150,16 @@ pub(crate) async fn run_task( if input.is_empty() { return None; } + + let auto_compact_limit = turn_context + .client + .get_model_family() + .auto_compact_token_limit() + .unwrap_or(i64::MAX); + let total_usage_tokens = sess.get_total_token_usage().await; + if total_usage_tokens >= auto_compact_limit { + run_auto_compact(&sess, &turn_context).await; + } let event = EventMsg::TaskStarted(TaskStartedEvent { model_context_window: turn_context.client.get_model_context_window(), }); @@ -2232,25 +2242,12 @@ pub(crate) async fn run_task( needs_follow_up, last_agent_message: turn_last_agent_message, } = turn_output; - let limit = turn_context - .client - .get_model_family() - .auto_compact_token_limit() - .unwrap_or(i64::MAX); let total_usage_tokens = sess.get_total_token_usage().await; - let token_limit_reached = total_usage_tokens >= limit; + let token_limit_reached = total_usage_tokens >= auto_compact_limit; // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. - if token_limit_reached { - if should_use_remote_compact_task( - sess.as_ref(), - &turn_context.client.get_provider(), - ) { - run_inline_remote_auto_compact_task(sess.clone(), turn_context.clone()) - .await; - } else { - run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await; - } + if token_limit_reached && needs_follow_up { + run_auto_compact(&sess, &turn_context).await; continue; } @@ -2292,6 +2289,14 @@ pub(crate) async fn run_task( last_agent_message } +async fn run_auto_compact(sess: &Arc, turn_context: &Arc) { + if should_use_remote_compact_task(sess.as_ref(), &turn_context.client.get_provider()) { + run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await; + } else { + run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await; + } +} + #[instrument( skip_all, fields( diff --git a/codex-rs/core/tests/suite/compact.rs b/codex-rs/core/tests/suite/compact.rs index bffb601eb..f223f4d10 100644 --- a/codex-rs/core/tests/suite/compact.rs +++ b/codex-rs/core/tests/suite/compact.rs @@ -1009,7 +1009,6 @@ async fn auto_compact_runs_after_token_limit_hit() { ev_assistant_message("m3", AUTO_SUMMARY_TEXT), ev_completed_with_tokens("r3", 200), ]); - let sse_resume = sse(vec![ev_completed("r3-resume")]); let sse4 = sse(vec![ ev_assistant_message("m4", FINAL_REPLY), ev_completed_with_tokens("r4", 120), @@ -1038,15 +1037,6 @@ async fn auto_compact_runs_after_token_limit_hit() { }; mount_sse_once_match(&server, third_matcher, sse3).await; - let resume_marker = prefixed_auto_summary; - let resume_matcher = move |req: &wiremock::Request| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); - body.contains(resume_marker) - && !body_contains_text(body, SUMMARIZATION_PROMPT) - && !body.contains(POST_AUTO_USER_MSG) - }; - mount_sse_once_match(&server, resume_matcher, sse_resume).await; - let fourth_matcher = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); body.contains(POST_AUTO_USER_MSG) && !body_contains_text(body, SUMMARIZATION_PROMPT) @@ -1106,8 +1096,8 @@ async fn auto_compact_runs_after_token_limit_hit() { let requests = get_responses_requests(&server).await; assert_eq!( requests.len(), - 5, - "expected user turns, a compaction request, a resumed turn, and the follow-up turn; got {}", + 4, + "expected user turns, a compaction request, and the follow-up turn; got {}", requests.len() ); let is_auto_compact = |req: &wiremock::Request| { @@ -1131,19 +1121,6 @@ async fn auto_compact_runs_after_token_limit_hit() { "auto compact should add a third request" ); - let resume_summary_marker = prefixed_auto_summary; - let resume_index = requests - .iter() - .enumerate() - .find_map(|(idx, req)| { - let body = std::str::from_utf8(&req.body).unwrap_or(""); - (body.contains(resume_summary_marker) - && !body_contains_text(body, SUMMARIZATION_PROMPT) - && !body.contains(POST_AUTO_USER_MSG)) - .then_some(idx) - }) - .expect("resume request missing after compaction"); - let follow_up_index = requests .iter() .enumerate() @@ -1154,15 +1131,12 @@ async fn auto_compact_runs_after_token_limit_hit() { .then_some(idx) }) .expect("follow-up request missing"); - assert_eq!(follow_up_index, 4, "follow-up request should be last"); + assert_eq!(follow_up_index, 3, "follow-up request should be last"); let body_first = requests[0].body_json::().unwrap(); let body_auto = requests[auto_compact_index] .body_json::() .unwrap(); - let body_resume = requests[resume_index] - .body_json::() - .unwrap(); let body_follow_up = requests[follow_up_index] .body_json::() .unwrap(); @@ -1201,23 +1175,6 @@ async fn auto_compact_runs_after_token_limit_hit() { "auto compact should send the summarization prompt as a user message", ); - let input_resume = body_resume.get("input").and_then(|v| v.as_array()).unwrap(); - assert!( - input_resume.iter().any(|item| { - item.get("type").and_then(|v| v.as_str()) == Some("message") - && item.get("role").and_then(|v| v.as_str()) == Some("user") - && item - .get("content") - .and_then(|v| v.as_array()) - .and_then(|arr| arr.first()) - .and_then(|entry| entry.get("text")) - .and_then(|v| v.as_str()) - .map(|text| text.contains(prefixed_auto_summary)) - .unwrap_or(false) - }), - "resume request should include compacted history" - ); - let input_follow_up = body_follow_up .get("input") .and_then(|v| v.as_array()) @@ -1276,6 +1233,10 @@ async fn auto_compact_persists_rollout_entries() { ev_assistant_message("m3", &auto_summary_payload), ev_completed_with_tokens("r3", 200), ]); + let sse4 = sse(vec![ + ev_assistant_message("m4", FINAL_REPLY), + ev_completed_with_tokens("r4", 120), + ]); let first_matcher = |req: &wiremock::Request| { let body = std::str::from_utf8(&req.body).unwrap_or(""); @@ -1299,12 +1260,19 @@ async fn auto_compact_persists_rollout_entries() { }; mount_sse_once_match(&server, third_matcher, sse3).await; + let fourth_matcher = |req: &wiremock::Request| { + let body = std::str::from_utf8(&req.body).unwrap_or(""); + body.contains(POST_AUTO_USER_MSG) && !body_contains_text(body, SUMMARIZATION_PROMPT) + }; + mount_sse_once_match(&server, fourth_matcher, sse4).await; + let model_provider = non_openai_model_provider(&server); let home = TempDir::new().unwrap(); let mut config = load_default_config_for_test(&home); config.model_provider = model_provider; set_test_compact_prompt(&mut config); + config.model_auto_compact_token_limit = Some(200_000); let conversation_manager = ConversationManager::with_models_provider( CodexAuth::from_api_key("dummy"), config.model_provider.clone(), @@ -1335,6 +1303,16 @@ async fn auto_compact_persists_rollout_entries() { .unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: POST_AUTO_USER_MSG.into(), + }], + }) + .await + .unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + codex.submit(Op::Shutdown).await.unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await; @@ -1731,6 +1709,8 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_ ev_assistant_message("m6", FINAL_REPLY), ev_completed_with_tokens("r6", 120), ]); + let follow_up_user = "FOLLOW_UP_AUTO_COMPACT"; + let final_user = "FINAL_AUTO_COMPACT"; mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4, sse5, sse6]).await; @@ -1751,31 +1731,31 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_ .unwrap() .conversation; - codex - .submit(Op::UserInput { - items: vec![UserInput::Text { - text: MULTI_AUTO_MSG.into(), - }], - }) - .await - .unwrap(); - let mut auto_compact_lifecycle_events = Vec::new(); - loop { - let event = codex.next_event().await.unwrap(); - if event.id.starts_with("auto-compact-") - && matches!( - event.msg, - EventMsg::TaskStarted(_) | EventMsg::TaskComplete(_) - ) - { - auto_compact_lifecycle_events.push(event); - continue; - } - if let EventMsg::TaskComplete(_) = &event.msg - && !event.id.starts_with("auto-compact-") - { - break; + for user in [MULTI_AUTO_MSG, follow_up_user, final_user] { + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { text: user.into() }], + }) + .await + .unwrap(); + + loop { + let event = codex.next_event().await.unwrap(); + if event.id.starts_with("auto-compact-") + && matches!( + event.msg, + EventMsg::TaskStarted(_) | EventMsg::TaskComplete(_) + ) + { + auto_compact_lifecycle_events.push(event); + continue; + } + if let EventMsg::TaskComplete(_) = &event.msg + && !event.id.starts_with("auto-compact-") + { + break; + } } } @@ -1821,6 +1801,7 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() { let context_window = 100; let limit = context_window * 90 / 100; let over_limit_tokens = context_window * 95 / 100 + 1; + let follow_up_user = "FOLLOW_UP_AFTER_LIMIT"; let first_turn = sse(vec![ ev_function_call(DUMMY_CALL_ID, DUMMY_FUNCTION_NAME, "{}"), @@ -1873,6 +1854,17 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() { wait_for_event(&codex, |msg| matches!(msg, EventMsg::TaskComplete(_))).await; + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: follow_up_user.into(), + }], + }) + .await + .unwrap(); + + wait_for_event(&codex, |msg| matches!(msg, EventMsg::TaskComplete(_))).await; + // Assert first request captured expected user message that triggers function call. let first_request = first_turn_mock.single_request().input(); assert!( @@ -1916,6 +1908,7 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() { let first_user = "COUNT_PRE_LAST_REASONING"; let second_user = "TRIGGER_COMPACT_AT_LIMIT"; + let third_user = "AFTER_REMOTE_COMPACT"; let pre_last_reasoning_content = "a".repeat(2_400); let post_last_reasoning_content = "b".repeat(4_000); @@ -1928,7 +1921,7 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() { ev_reasoning_item("post-reasoning", &["post"], &[&post_last_reasoning_content]), ev_completed_with_tokens("r2", 80), ]); - let resume_turn = sse(vec![ + let third_turn = sse(vec![ ev_assistant_message("m4", FINAL_REPLY), ev_completed_with_tokens("r4", 1), ]); @@ -1940,8 +1933,8 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() { first_turn, // Turn 2: reasoning after last user (should be ignored for compaction). second_turn, - // Turn 3: resume after remote compaction. - resume_turn, + // Turn 3: next user turn after remote compaction. + third_turn, ], ) .await; @@ -1973,7 +1966,10 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() { .expect("build codex") .codex; - for (idx, user) in [first_user, second_user].into_iter().enumerate() { + for (idx, user) in [first_user, second_user, third_user] + .into_iter() + .enumerate() + { codex .submit(Op::UserInput { items: vec![UserInput::Text { text: user.into() }], @@ -1982,10 +1978,10 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() { .unwrap(); wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; - if idx == 0 { + if idx < 2 { assert!( compact_mock.requests().is_empty(), - "remote compaction should not run after the first turn" + "remote compaction should not run before the next user turn" ); } } @@ -2006,20 +2002,21 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() { assert_eq!( requests.len(), 3, - "conversation should include two user turns and a post-compaction resume" + "conversation should include three user turns" ); let second_request_body = requests[1].body_json().to_string(); assert!( !second_request_body.contains("REMOTE_COMPACT_SUMMARY"), "second turn should not include compacted history" ); - let resume_body = requests[2].body_json().to_string(); + let third_request_body = requests[2].body_json().to_string(); assert!( - resume_body.contains("REMOTE_COMPACT_SUMMARY") || resume_body.contains(FINAL_REPLY), - "resume request should follow remote compact and use compacted history" + third_request_body.contains("REMOTE_COMPACT_SUMMARY") + || third_request_body.contains(FINAL_REPLY), + "third turn should include compacted history" ); assert!( - resume_body.contains("ENCRYPTED_COMPACTION_SUMMARY"), - "resume request should include compaction summary item" + third_request_body.contains("ENCRYPTED_COMPACTION_SUMMARY"), + "third turn should include compaction summary item" ); }