feat: do not compact on last user turn (#8060)
This commit is contained in:
parent
ae57e18947
commit
c9f5b9a6df
2 changed files with 100 additions and 98 deletions
|
|
@ -2150,6 +2150,16 @@ pub(crate) async fn run_task(
|
|||
if input.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
let auto_compact_limit = turn_context
|
||||
.client
|
||||
.get_model_family()
|
||||
.auto_compact_token_limit()
|
||||
.unwrap_or(i64::MAX);
|
||||
let total_usage_tokens = sess.get_total_token_usage().await;
|
||||
if total_usage_tokens >= auto_compact_limit {
|
||||
run_auto_compact(&sess, &turn_context).await;
|
||||
}
|
||||
let event = EventMsg::TaskStarted(TaskStartedEvent {
|
||||
model_context_window: turn_context.client.get_model_context_window(),
|
||||
});
|
||||
|
|
@ -2232,25 +2242,12 @@ pub(crate) async fn run_task(
|
|||
needs_follow_up,
|
||||
last_agent_message: turn_last_agent_message,
|
||||
} = turn_output;
|
||||
let limit = turn_context
|
||||
.client
|
||||
.get_model_family()
|
||||
.auto_compact_token_limit()
|
||||
.unwrap_or(i64::MAX);
|
||||
let total_usage_tokens = sess.get_total_token_usage().await;
|
||||
let token_limit_reached = total_usage_tokens >= limit;
|
||||
let token_limit_reached = total_usage_tokens >= auto_compact_limit;
|
||||
|
||||
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
|
||||
if token_limit_reached {
|
||||
if should_use_remote_compact_task(
|
||||
sess.as_ref(),
|
||||
&turn_context.client.get_provider(),
|
||||
) {
|
||||
run_inline_remote_auto_compact_task(sess.clone(), turn_context.clone())
|
||||
.await;
|
||||
} else {
|
||||
run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await;
|
||||
}
|
||||
if token_limit_reached && needs_follow_up {
|
||||
run_auto_compact(&sess, &turn_context).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -2292,6 +2289,14 @@ pub(crate) async fn run_task(
|
|||
last_agent_message
|
||||
}
|
||||
|
||||
async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
|
||||
if should_use_remote_compact_task(sess.as_ref(), &turn_context.client.get_provider()) {
|
||||
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
|
||||
} else {
|
||||
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(
|
||||
skip_all,
|
||||
fields(
|
||||
|
|
|
|||
|
|
@ -1009,7 +1009,6 @@ async fn auto_compact_runs_after_token_limit_hit() {
|
|||
ev_assistant_message("m3", AUTO_SUMMARY_TEXT),
|
||||
ev_completed_with_tokens("r3", 200),
|
||||
]);
|
||||
let sse_resume = sse(vec![ev_completed("r3-resume")]);
|
||||
let sse4 = sse(vec![
|
||||
ev_assistant_message("m4", FINAL_REPLY),
|
||||
ev_completed_with_tokens("r4", 120),
|
||||
|
|
@ -1038,15 +1037,6 @@ async fn auto_compact_runs_after_token_limit_hit() {
|
|||
};
|
||||
mount_sse_once_match(&server, third_matcher, sse3).await;
|
||||
|
||||
let resume_marker = prefixed_auto_summary;
|
||||
let resume_matcher = move |req: &wiremock::Request| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
body.contains(resume_marker)
|
||||
&& !body_contains_text(body, SUMMARIZATION_PROMPT)
|
||||
&& !body.contains(POST_AUTO_USER_MSG)
|
||||
};
|
||||
mount_sse_once_match(&server, resume_matcher, sse_resume).await;
|
||||
|
||||
let fourth_matcher = |req: &wiremock::Request| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
body.contains(POST_AUTO_USER_MSG) && !body_contains_text(body, SUMMARIZATION_PROMPT)
|
||||
|
|
@ -1106,8 +1096,8 @@ async fn auto_compact_runs_after_token_limit_hit() {
|
|||
let requests = get_responses_requests(&server).await;
|
||||
assert_eq!(
|
||||
requests.len(),
|
||||
5,
|
||||
"expected user turns, a compaction request, a resumed turn, and the follow-up turn; got {}",
|
||||
4,
|
||||
"expected user turns, a compaction request, and the follow-up turn; got {}",
|
||||
requests.len()
|
||||
);
|
||||
let is_auto_compact = |req: &wiremock::Request| {
|
||||
|
|
@ -1131,19 +1121,6 @@ async fn auto_compact_runs_after_token_limit_hit() {
|
|||
"auto compact should add a third request"
|
||||
);
|
||||
|
||||
let resume_summary_marker = prefixed_auto_summary;
|
||||
let resume_index = requests
|
||||
.iter()
|
||||
.enumerate()
|
||||
.find_map(|(idx, req)| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
(body.contains(resume_summary_marker)
|
||||
&& !body_contains_text(body, SUMMARIZATION_PROMPT)
|
||||
&& !body.contains(POST_AUTO_USER_MSG))
|
||||
.then_some(idx)
|
||||
})
|
||||
.expect("resume request missing after compaction");
|
||||
|
||||
let follow_up_index = requests
|
||||
.iter()
|
||||
.enumerate()
|
||||
|
|
@ -1154,15 +1131,12 @@ async fn auto_compact_runs_after_token_limit_hit() {
|
|||
.then_some(idx)
|
||||
})
|
||||
.expect("follow-up request missing");
|
||||
assert_eq!(follow_up_index, 4, "follow-up request should be last");
|
||||
assert_eq!(follow_up_index, 3, "follow-up request should be last");
|
||||
|
||||
let body_first = requests[0].body_json::<serde_json::Value>().unwrap();
|
||||
let body_auto = requests[auto_compact_index]
|
||||
.body_json::<serde_json::Value>()
|
||||
.unwrap();
|
||||
let body_resume = requests[resume_index]
|
||||
.body_json::<serde_json::Value>()
|
||||
.unwrap();
|
||||
let body_follow_up = requests[follow_up_index]
|
||||
.body_json::<serde_json::Value>()
|
||||
.unwrap();
|
||||
|
|
@ -1201,23 +1175,6 @@ async fn auto_compact_runs_after_token_limit_hit() {
|
|||
"auto compact should send the summarization prompt as a user message",
|
||||
);
|
||||
|
||||
let input_resume = body_resume.get("input").and_then(|v| v.as_array()).unwrap();
|
||||
assert!(
|
||||
input_resume.iter().any(|item| {
|
||||
item.get("type").and_then(|v| v.as_str()) == Some("message")
|
||||
&& item.get("role").and_then(|v| v.as_str()) == Some("user")
|
||||
&& item
|
||||
.get("content")
|
||||
.and_then(|v| v.as_array())
|
||||
.and_then(|arr| arr.first())
|
||||
.and_then(|entry| entry.get("text"))
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|text| text.contains(prefixed_auto_summary))
|
||||
.unwrap_or(false)
|
||||
}),
|
||||
"resume request should include compacted history"
|
||||
);
|
||||
|
||||
let input_follow_up = body_follow_up
|
||||
.get("input")
|
||||
.and_then(|v| v.as_array())
|
||||
|
|
@ -1276,6 +1233,10 @@ async fn auto_compact_persists_rollout_entries() {
|
|||
ev_assistant_message("m3", &auto_summary_payload),
|
||||
ev_completed_with_tokens("r3", 200),
|
||||
]);
|
||||
let sse4 = sse(vec![
|
||||
ev_assistant_message("m4", FINAL_REPLY),
|
||||
ev_completed_with_tokens("r4", 120),
|
||||
]);
|
||||
|
||||
let first_matcher = |req: &wiremock::Request| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
|
|
@ -1299,12 +1260,19 @@ async fn auto_compact_persists_rollout_entries() {
|
|||
};
|
||||
mount_sse_once_match(&server, third_matcher, sse3).await;
|
||||
|
||||
let fourth_matcher = |req: &wiremock::Request| {
|
||||
let body = std::str::from_utf8(&req.body).unwrap_or("");
|
||||
body.contains(POST_AUTO_USER_MSG) && !body_contains_text(body, SUMMARIZATION_PROMPT)
|
||||
};
|
||||
mount_sse_once_match(&server, fourth_matcher, sse4).await;
|
||||
|
||||
let model_provider = non_openai_model_provider(&server);
|
||||
|
||||
let home = TempDir::new().unwrap();
|
||||
let mut config = load_default_config_for_test(&home);
|
||||
config.model_provider = model_provider;
|
||||
set_test_compact_prompt(&mut config);
|
||||
config.model_auto_compact_token_limit = Some(200_000);
|
||||
let conversation_manager = ConversationManager::with_models_provider(
|
||||
CodexAuth::from_api_key("dummy"),
|
||||
config.model_provider.clone(),
|
||||
|
|
@ -1335,6 +1303,16 @@ async fn auto_compact_persists_rollout_entries() {
|
|||
.unwrap();
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: POST_AUTO_USER_MSG.into(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
||||
|
||||
codex.submit(Op::Shutdown).await.unwrap();
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::ShutdownComplete)).await;
|
||||
|
||||
|
|
@ -1731,6 +1709,8 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_
|
|||
ev_assistant_message("m6", FINAL_REPLY),
|
||||
ev_completed_with_tokens("r6", 120),
|
||||
]);
|
||||
let follow_up_user = "FOLLOW_UP_AUTO_COMPACT";
|
||||
let final_user = "FINAL_AUTO_COMPACT";
|
||||
|
||||
mount_sse_sequence(&server, vec![sse1, sse2, sse3, sse4, sse5, sse6]).await;
|
||||
|
||||
|
|
@ -1751,31 +1731,31 @@ async fn auto_compact_allows_multiple_attempts_when_interleaved_with_other_turn_
|
|||
.unwrap()
|
||||
.conversation;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: MULTI_AUTO_MSG.into(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let mut auto_compact_lifecycle_events = Vec::new();
|
||||
loop {
|
||||
let event = codex.next_event().await.unwrap();
|
||||
if event.id.starts_with("auto-compact-")
|
||||
&& matches!(
|
||||
event.msg,
|
||||
EventMsg::TaskStarted(_) | EventMsg::TaskComplete(_)
|
||||
)
|
||||
{
|
||||
auto_compact_lifecycle_events.push(event);
|
||||
continue;
|
||||
}
|
||||
if let EventMsg::TaskComplete(_) = &event.msg
|
||||
&& !event.id.starts_with("auto-compact-")
|
||||
{
|
||||
break;
|
||||
for user in [MULTI_AUTO_MSG, follow_up_user, final_user] {
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text { text: user.into() }],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
loop {
|
||||
let event = codex.next_event().await.unwrap();
|
||||
if event.id.starts_with("auto-compact-")
|
||||
&& matches!(
|
||||
event.msg,
|
||||
EventMsg::TaskStarted(_) | EventMsg::TaskComplete(_)
|
||||
)
|
||||
{
|
||||
auto_compact_lifecycle_events.push(event);
|
||||
continue;
|
||||
}
|
||||
if let EventMsg::TaskComplete(_) = &event.msg
|
||||
&& !event.id.starts_with("auto-compact-")
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1821,6 +1801,7 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
|
|||
let context_window = 100;
|
||||
let limit = context_window * 90 / 100;
|
||||
let over_limit_tokens = context_window * 95 / 100 + 1;
|
||||
let follow_up_user = "FOLLOW_UP_AFTER_LIMIT";
|
||||
|
||||
let first_turn = sse(vec![
|
||||
ev_function_call(DUMMY_CALL_ID, DUMMY_FUNCTION_NAME, "{}"),
|
||||
|
|
@ -1873,6 +1854,17 @@ async fn auto_compact_triggers_after_function_call_over_95_percent_usage() {
|
|||
|
||||
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TaskComplete(_))).await;
|
||||
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text {
|
||||
text: follow_up_user.into(),
|
||||
}],
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
wait_for_event(&codex, |msg| matches!(msg, EventMsg::TaskComplete(_))).await;
|
||||
|
||||
// Assert first request captured expected user message that triggers function call.
|
||||
let first_request = first_turn_mock.single_request().input();
|
||||
assert!(
|
||||
|
|
@ -1916,6 +1908,7 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
|
|||
|
||||
let first_user = "COUNT_PRE_LAST_REASONING";
|
||||
let second_user = "TRIGGER_COMPACT_AT_LIMIT";
|
||||
let third_user = "AFTER_REMOTE_COMPACT";
|
||||
|
||||
let pre_last_reasoning_content = "a".repeat(2_400);
|
||||
let post_last_reasoning_content = "b".repeat(4_000);
|
||||
|
|
@ -1928,7 +1921,7 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
|
|||
ev_reasoning_item("post-reasoning", &["post"], &[&post_last_reasoning_content]),
|
||||
ev_completed_with_tokens("r2", 80),
|
||||
]);
|
||||
let resume_turn = sse(vec![
|
||||
let third_turn = sse(vec![
|
||||
ev_assistant_message("m4", FINAL_REPLY),
|
||||
ev_completed_with_tokens("r4", 1),
|
||||
]);
|
||||
|
|
@ -1940,8 +1933,8 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
|
|||
first_turn,
|
||||
// Turn 2: reasoning after last user (should be ignored for compaction).
|
||||
second_turn,
|
||||
// Turn 3: resume after remote compaction.
|
||||
resume_turn,
|
||||
// Turn 3: next user turn after remote compaction.
|
||||
third_turn,
|
||||
],
|
||||
)
|
||||
.await;
|
||||
|
|
@ -1973,7 +1966,10 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
|
|||
.expect("build codex")
|
||||
.codex;
|
||||
|
||||
for (idx, user) in [first_user, second_user].into_iter().enumerate() {
|
||||
for (idx, user) in [first_user, second_user, third_user]
|
||||
.into_iter()
|
||||
.enumerate()
|
||||
{
|
||||
codex
|
||||
.submit(Op::UserInput {
|
||||
items: vec![UserInput::Text { text: user.into() }],
|
||||
|
|
@ -1982,10 +1978,10 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
|
|||
.unwrap();
|
||||
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
|
||||
|
||||
if idx == 0 {
|
||||
if idx < 2 {
|
||||
assert!(
|
||||
compact_mock.requests().is_empty(),
|
||||
"remote compaction should not run after the first turn"
|
||||
"remote compaction should not run before the next user turn"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
@ -2006,20 +2002,21 @@ async fn auto_compact_counts_encrypted_reasoning_before_last_user() {
|
|||
assert_eq!(
|
||||
requests.len(),
|
||||
3,
|
||||
"conversation should include two user turns and a post-compaction resume"
|
||||
"conversation should include three user turns"
|
||||
);
|
||||
let second_request_body = requests[1].body_json().to_string();
|
||||
assert!(
|
||||
!second_request_body.contains("REMOTE_COMPACT_SUMMARY"),
|
||||
"second turn should not include compacted history"
|
||||
);
|
||||
let resume_body = requests[2].body_json().to_string();
|
||||
let third_request_body = requests[2].body_json().to_string();
|
||||
assert!(
|
||||
resume_body.contains("REMOTE_COMPACT_SUMMARY") || resume_body.contains(FINAL_REPLY),
|
||||
"resume request should follow remote compact and use compacted history"
|
||||
third_request_body.contains("REMOTE_COMPACT_SUMMARY")
|
||||
|| third_request_body.contains(FINAL_REPLY),
|
||||
"third turn should include compacted history"
|
||||
);
|
||||
assert!(
|
||||
resume_body.contains("ENCRYPTED_COMPACTION_SUMMARY"),
|
||||
"resume request should include compaction summary item"
|
||||
third_request_body.contains("ENCRYPTED_COMPACTION_SUMMARY"),
|
||||
"third turn should include compaction summary item"
|
||||
);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue