Treat compaction failure as failure state (#10927)

- Return compaction errors from local and remote compaction flows.\n-
Stop turns/tasks when auto-compaction fails instead of continuing
execution.
This commit is contained in:
Ahmed Ibrahim 2026-02-06 13:51:46 -08:00 committed by GitHub
parent 1751116ec6
commit ba8b5d9018
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 258 additions and 46 deletions

View file

@ -105,6 +105,7 @@ async fn auto_compaction_local_emits_started_and_completed_items() -> Result<()>
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()> {
skip_if_no_network!(Ok(()));
const REMOTE_AUTO_COMPACT_LIMIT: i64 = 200_000;
let server = responses::start_mock_server().await;
let sse1 = responses::sse(vec![
@ -146,7 +147,7 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()
codex_home.path(),
&server.uri(),
&BTreeMap::default(),
AUTO_COMPACT_LIMIT,
REMOTE_AUTO_COMPACT_LIMIT,
Some(true),
"openai",
COMPACT_PROMPT,

View file

@ -3672,8 +3672,10 @@ pub(crate) async fn run_turn(
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, event).await;
if total_usage_tokens >= auto_compact_limit {
run_auto_compact(&sess, &turn_context).await;
if total_usage_tokens >= auto_compact_limit
&& run_auto_compact(&sess, &turn_context).await.is_err()
{
return None;
}
let skills_outcome = Some(
@ -3855,7 +3857,9 @@ pub(crate) async fn run_turn(
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached && needs_follow_up {
run_auto_compact(&sess, &turn_context).await;
if run_auto_compact(&sess, &turn_context).await.is_err() {
return None;
}
continue;
}
@ -3913,12 +3917,13 @@ pub(crate) async fn run_turn(
last_agent_message
}
async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
async fn run_auto_compact(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) -> CodexResult<()> {
if should_use_remote_compact_task(&turn_context.provider) {
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?;
} else {
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await;
run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?;
}
Ok(())
}
fn filter_connectors_for_input(

View file

@ -39,7 +39,7 @@ pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bo
pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) {
) -> CodexResult<()> {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text {
text: prompt,
@ -47,27 +47,28 @@ pub(crate) async fn run_inline_auto_compact_task(
text_elements: Vec::new(),
}];
run_compact_task_inner(sess, turn_context, input).await;
run_compact_task_inner(sess, turn_context, input).await?;
Ok(())
}
pub(crate) async fn run_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) {
) -> CodexResult<()> {
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: turn_context.model_context_window(),
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(sess.clone(), turn_context, input).await;
run_compact_task_inner(sess.clone(), turn_context, input).await
}
async fn run_compact_task_inner(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) {
) -> CodexResult<()> {
let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new());
sess.emit_turn_item_started(&turn_context, &compaction_item)
.await;
@ -142,7 +143,7 @@ async fn run_compact_task_inner(
break;
}
Err(CodexErr::Interrupted) => {
return;
return Err(CodexErr::Interrupted);
}
Err(e @ CodexErr::ContextWindowExceeded) => {
if turn_input_len > 1 {
@ -158,7 +159,7 @@ async fn run_compact_task_inner(
sess.set_total_tokens_full(turn_context.as_ref()).await;
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return;
return Err(e);
}
Err(e) => {
if retries < max_retries {
@ -175,7 +176,7 @@ async fn run_compact_task_inner(
} else {
let event = EventMsg::Error(e.to_error_event(None));
sess.send_event(&turn_context, event).await;
return;
return Err(e);
}
}
}
@ -210,6 +211,7 @@ async fn run_compact_task_inner(
message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(),
});
sess.send_event(&turn_context, warning).await;
Ok(())
}
pub fn content_items_to_text(content: &[ContentItem]) -> Option<String> {

View file

@ -19,27 +19,36 @@ use tracing::info;
pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) {
run_remote_compact_task_inner(&sess, &turn_context).await;
) -> CodexResult<()> {
run_remote_compact_task_inner(&sess, &turn_context).await?;
Ok(())
}
pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Arc<TurnContext>) {
pub(crate) async fn run_remote_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) -> CodexResult<()> {
let start_event = EventMsg::TurnStarted(TurnStartedEvent {
model_context_window: turn_context.model_context_window(),
collaboration_mode_kind: turn_context.collaboration_mode.mode,
});
sess.send_event(&turn_context, start_event).await;
run_remote_compact_task_inner(&sess, &turn_context).await;
run_remote_compact_task_inner(&sess, &turn_context).await
}
async fn run_remote_compact_task_inner(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
async fn run_remote_compact_task_inner(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<()> {
if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await {
let event = EventMsg::Error(
err.to_error_event(Some("Error running remote compact task".to_string())),
);
sess.send_event(turn_context, event).await;
return Err(err);
}
Ok(())
}
async fn run_remote_compact_task_inner_impl(

View file

@ -31,14 +31,14 @@ impl SessionTask for CompactTask {
1,
&[("type", "remote")],
);
crate::compact_remote::run_remote_compact_task(session, ctx).await
let _ = crate::compact_remote::run_remote_compact_task(session, ctx).await;
} else {
let _ = session.services.otel_manager.counter(
"codex.task.compact",
1,
&[("type", "local")],
);
crate::compact::run_compact_task(session, ctx, input).await
let _ = crate::compact::run_compact_task(session, ctx, input).await;
}
None

View file

@ -245,12 +245,13 @@ async fn remote_compact_trims_function_call_history_to_fit_context_window() -> R
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_context_window = Some(2_000);
config.model_auto_compact_token_limit = Some(200_000);
}),
)
.await?;
let codex = harness.test().codex.clone();
let response_log = responses::mount_sse_sequence(
responses::mount_sse_sequence(
harness.server(),
vec![
sse(vec![
@ -299,17 +300,139 @@ async fn remote_compact_trims_function_call_history_to_fit_context_window() -> R
codex.submit(Op::Compact).await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let compact_request = compact_mock.single_request();
let user_messages = compact_request.message_input_texts("user");
assert!(
response_log
.function_call_output_text(retained_call_id)
.is_some(),
"expected retained shell call to produce function_call_output before compaction"
user_messages
.iter()
.any(|message| message == first_user_message),
"expected compact request to retain earlier user history"
);
assert!(
response_log
.function_call_output_text(trimmed_call_id)
.is_some(),
"expected trimmed shell call to produce function_call_output before compaction"
user_messages
.iter()
.any(|message| message == second_user_message),
"expected compact request to retain the user boundary message"
);
assert!(
compact_request.has_function_call(retained_call_id)
&& compact_request
.function_call_output_text(retained_call_id)
.is_some(),
"expected compact request to keep the older function call/result pair"
);
assert!(
!compact_request.has_function_call(trimmed_call_id)
&& compact_request
.function_call_output_text(trimmed_call_id)
.is_none(),
"expected compact request to drop the trailing function call/result pair past the boundary"
);
assert_eq!(
compact_request.inputs_of_type("function_call").len(),
1,
"expected exactly one function call after trimming"
);
assert_eq!(
compact_request.inputs_of_type("function_call_output").len(),
1,
"expected exactly one function call output after trimming"
);
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_remote_compact_trims_function_call_history_to_fit_context_window() -> Result<()> {
skip_if_no_network!(Ok(()));
let first_user_message = "turn with retained shell call";
let second_user_message = "turn with trimmed shell call";
let retained_call_id = "retained-call";
let trimmed_call_id = "trimmed-call";
let retained_command = "echo retained-shell-output";
let trimmed_command = "yes x | head -n 3000";
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_context_window = Some(2_000);
config.model_auto_compact_token_limit = Some(200_000);
}),
)
.await?;
let codex = harness.test().codex.clone();
responses::mount_sse_sequence(
harness.server(),
vec![
sse(vec![
responses::ev_shell_command_call(retained_call_id, retained_command),
responses::ev_completed_with_tokens("retained-call-response", 100),
]),
sse(vec![
responses::ev_assistant_message("retained-assistant", "retained complete"),
responses::ev_completed("retained-final-response"),
]),
sse(vec![
responses::ev_shell_command_call(trimmed_call_id, trimmed_command),
responses::ev_completed_with_tokens("trimmed-call-response", 100),
]),
sse(vec![responses::ev_completed_with_tokens(
"trimmed-final-response",
500_000,
)]),
sse(vec![
responses::ev_assistant_message("post-compact-assistant", "post compact complete"),
responses::ev_completed("post-compact-final-response"),
]),
],
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: first_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: second_user_message.into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
let compact_mock =
responses::mount_compact_json_once(harness.server(), serde_json::json!({ "output": [] }))
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "turn that triggers auto compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert_eq!(
compact_mock.requests().len(),
1,
"expected exactly one remote compact request"
);
let compact_request = compact_mock.single_request();
@ -356,6 +479,88 @@ async fn remote_compact_trims_function_call_history_to_fit_context_window() -> R
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn auto_remote_compact_failure_stops_agent_loop() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.model_auto_compact_token_limit = Some(120);
}),
)
.await?;
let codex = harness.test().codex.clone();
mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("initial-assistant", "initial turn complete"),
responses::ev_completed_with_tokens("initial-response", 500_000),
]),
)
.await;
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": "invalid compact payload shape" }),
)
.await;
let post_compact_turn_mock = mount_sse_once(
harness.server(),
sse(vec![
responses::ev_assistant_message("post-compact-assistant", "should not run"),
responses::ev_completed("post-compact-response"),
]),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "turn that exceeds token threshold".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "turn that triggers auto compact".into(),
text_elements: Vec::new(),
}],
final_output_json_schema: None,
})
.await?;
let error_message = wait_for_event_match(&codex, |event| match event {
EventMsg::Error(err) => Some(err.message.clone()),
_ => None,
})
.await;
wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await;
assert!(
error_message.contains("Error running remote compact task"),
"expected compact failure error, got {error_message}"
);
assert_eq!(
compact_mock.requests().len(),
1,
"expected exactly one remote compact attempt"
);
assert!(
post_compact_turn_mock.requests().is_empty(),
"expected agent loop to stop after compaction failure"
);
Ok(())
}
#[cfg_attr(target_os = "windows", ignore)]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_trim_estimate_uses_session_base_instructions() -> Result<()> {
@ -806,11 +1011,8 @@ async fn remote_compact_and_resume_refresh_stale_developer_instructions() -> Res
let server = wiremock::MockServer::start().await;
let stale_developer_message = "STALE_DEVELOPER_INSTRUCTIONS_SHOULD_BE_REMOVED";
let mut start_builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.features.enable(Feature::RemoteCompaction);
});
let mut start_builder =
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let initial = start_builder.build(&server).await?;
let home = initial.home.clone();
let rollout_path = initial
@ -900,11 +1102,8 @@ async fn remote_compact_and_resume_refresh_stale_developer_instructions() -> Res
})
.await;
let mut resume_builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.features.enable(Feature::RemoteCompaction);
});
let mut resume_builder =
test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let resumed = resume_builder.resume(&server, home, rollout_path).await?;
resumed
@ -964,11 +1163,7 @@ async fn remote_compact_refreshes_stale_developer_instructions_without_resume()
let server = wiremock::MockServer::start().await;
let stale_developer_message = "STALE_DEVELOPER_INSTRUCTIONS_SHOULD_BE_REMOVED";
let mut builder = test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.features.enable(Feature::RemoteCompaction);
});
let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing());
let test = builder.build(&server).await?;
let responses_mock = responses::mount_sse_sequence(