From ba8b5d9018bf82aa397363e1b9847b01fcd26eb3 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Fri, 6 Feb 2026 13:51:46 -0800 Subject: [PATCH] Treat compaction failure as failure state (#10927) - Return compaction errors from local and remote compaction flows.\n- Stop turns/tasks when auto-compaction fails instead of continuing execution. --- .../app-server/tests/suite/v2/compaction.rs | 3 +- codex-rs/core/src/codex.rs | 17 +- codex-rs/core/src/compact.rs | 18 +- codex-rs/core/src/compact_remote.rs | 19 +- codex-rs/core/src/tasks/compact.rs | 4 +- codex-rs/core/tests/suite/compact_remote.rs | 243 ++++++++++++++++-- 6 files changed, 258 insertions(+), 46 deletions(-) diff --git a/codex-rs/app-server/tests/suite/v2/compaction.rs b/codex-rs/app-server/tests/suite/v2/compaction.rs index daf6030dd..5b5faa02d 100644 --- a/codex-rs/app-server/tests/suite/v2/compaction.rs +++ b/codex-rs/app-server/tests/suite/v2/compaction.rs @@ -105,6 +105,7 @@ async fn auto_compaction_local_emits_started_and_completed_items() -> Result<()> #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<()> { skip_if_no_network!(Ok(())); + const REMOTE_AUTO_COMPACT_LIMIT: i64 = 200_000; let server = responses::start_mock_server().await; let sse1 = responses::sse(vec![ @@ -146,7 +147,7 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<() codex_home.path(), &server.uri(), &BTreeMap::default(), - AUTO_COMPACT_LIMIT, + REMOTE_AUTO_COMPACT_LIMIT, Some(true), "openai", COMPACT_PROMPT, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 9bfe3469f..663c65585 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -3672,8 +3672,10 @@ pub(crate) async fn run_turn( collaboration_mode_kind: turn_context.collaboration_mode.mode, }); sess.send_event(&turn_context, event).await; - if total_usage_tokens >= auto_compact_limit { - run_auto_compact(&sess, &turn_context).await; + if total_usage_tokens >= auto_compact_limit + && run_auto_compact(&sess, &turn_context).await.is_err() + { + return None; } let skills_outcome = Some( @@ -3855,7 +3857,9 @@ pub(crate) async fn run_turn( // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. if token_limit_reached && needs_follow_up { - run_auto_compact(&sess, &turn_context).await; + if run_auto_compact(&sess, &turn_context).await.is_err() { + return None; + } continue; } @@ -3913,12 +3917,13 @@ pub(crate) async fn run_turn( last_agent_message } -async fn run_auto_compact(sess: &Arc, turn_context: &Arc) { +async fn run_auto_compact(sess: &Arc, turn_context: &Arc) -> CodexResult<()> { if should_use_remote_compact_task(&turn_context.provider) { - run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await; + run_inline_remote_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?; } else { - run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await; + run_inline_auto_compact_task(Arc::clone(sess), Arc::clone(turn_context)).await?; } + Ok(()) } fn filter_connectors_for_input( diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index bbf5700b9..99f789603 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -39,7 +39,7 @@ pub(crate) fn should_use_remote_compact_task(provider: &ModelProviderInfo) -> bo pub(crate) async fn run_inline_auto_compact_task( sess: Arc, turn_context: Arc, -) { +) -> CodexResult<()> { let prompt = turn_context.compact_prompt().to_string(); let input = vec![UserInput::Text { text: prompt, @@ -47,27 +47,28 @@ pub(crate) async fn run_inline_auto_compact_task( text_elements: Vec::new(), }]; - run_compact_task_inner(sess, turn_context, input).await; + run_compact_task_inner(sess, turn_context, input).await?; + Ok(()) } pub(crate) async fn run_compact_task( sess: Arc, turn_context: Arc, input: Vec, -) { +) -> CodexResult<()> { let start_event = EventMsg::TurnStarted(TurnStartedEvent { model_context_window: turn_context.model_context_window(), collaboration_mode_kind: turn_context.collaboration_mode.mode, }); sess.send_event(&turn_context, start_event).await; - run_compact_task_inner(sess.clone(), turn_context, input).await; + run_compact_task_inner(sess.clone(), turn_context, input).await } async fn run_compact_task_inner( sess: Arc, turn_context: Arc, input: Vec, -) { +) -> CodexResult<()> { let compaction_item = TurnItem::ContextCompaction(ContextCompactionItem::new()); sess.emit_turn_item_started(&turn_context, &compaction_item) .await; @@ -142,7 +143,7 @@ async fn run_compact_task_inner( break; } Err(CodexErr::Interrupted) => { - return; + return Err(CodexErr::Interrupted); } Err(e @ CodexErr::ContextWindowExceeded) => { if turn_input_len > 1 { @@ -158,7 +159,7 @@ async fn run_compact_task_inner( sess.set_total_tokens_full(turn_context.as_ref()).await; let event = EventMsg::Error(e.to_error_event(None)); sess.send_event(&turn_context, event).await; - return; + return Err(e); } Err(e) => { if retries < max_retries { @@ -175,7 +176,7 @@ async fn run_compact_task_inner( } else { let event = EventMsg::Error(e.to_error_event(None)); sess.send_event(&turn_context, event).await; - return; + return Err(e); } } } @@ -210,6 +211,7 @@ async fn run_compact_task_inner( message: "Heads up: Long threads and multiple compactions can cause the model to be less accurate. Start a new thread when possible to keep threads small and targeted.".to_string(), }); sess.send_event(&turn_context, warning).await; + Ok(()) } pub fn content_items_to_text(content: &[ContentItem]) -> Option { diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 8aeeb4016..eac3188cd 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -19,27 +19,36 @@ use tracing::info; pub(crate) async fn run_inline_remote_auto_compact_task( sess: Arc, turn_context: Arc, -) { - run_remote_compact_task_inner(&sess, &turn_context).await; +) -> CodexResult<()> { + run_remote_compact_task_inner(&sess, &turn_context).await?; + Ok(()) } -pub(crate) async fn run_remote_compact_task(sess: Arc, turn_context: Arc) { +pub(crate) async fn run_remote_compact_task( + sess: Arc, + turn_context: Arc, +) -> CodexResult<()> { let start_event = EventMsg::TurnStarted(TurnStartedEvent { model_context_window: turn_context.model_context_window(), collaboration_mode_kind: turn_context.collaboration_mode.mode, }); sess.send_event(&turn_context, start_event).await; - run_remote_compact_task_inner(&sess, &turn_context).await; + run_remote_compact_task_inner(&sess, &turn_context).await } -async fn run_remote_compact_task_inner(sess: &Arc, turn_context: &Arc) { +async fn run_remote_compact_task_inner( + sess: &Arc, + turn_context: &Arc, +) -> CodexResult<()> { if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await { let event = EventMsg::Error( err.to_error_event(Some("Error running remote compact task".to_string())), ); sess.send_event(turn_context, event).await; + return Err(err); } + Ok(()) } async fn run_remote_compact_task_inner_impl( diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index 908b0460f..b56f7b1df 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -31,14 +31,14 @@ impl SessionTask for CompactTask { 1, &[("type", "remote")], ); - crate::compact_remote::run_remote_compact_task(session, ctx).await + let _ = crate::compact_remote::run_remote_compact_task(session, ctx).await; } else { let _ = session.services.otel_manager.counter( "codex.task.compact", 1, &[("type", "local")], ); - crate::compact::run_compact_task(session, ctx, input).await + let _ = crate::compact::run_compact_task(session, ctx, input).await; } None diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 75319e42e..47a9a0a11 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -245,12 +245,13 @@ async fn remote_compact_trims_function_call_history_to_fit_context_window() -> R .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) .with_config(|config| { config.model_context_window = Some(2_000); + config.model_auto_compact_token_limit = Some(200_000); }), ) .await?; let codex = harness.test().codex.clone(); - let response_log = responses::mount_sse_sequence( + responses::mount_sse_sequence( harness.server(), vec![ sse(vec![ @@ -299,17 +300,139 @@ async fn remote_compact_trims_function_call_history_to_fit_context_window() -> R codex.submit(Op::Compact).await?; wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + let compact_request = compact_mock.single_request(); + let user_messages = compact_request.message_input_texts("user"); assert!( - response_log - .function_call_output_text(retained_call_id) - .is_some(), - "expected retained shell call to produce function_call_output before compaction" + user_messages + .iter() + .any(|message| message == first_user_message), + "expected compact request to retain earlier user history" ); assert!( - response_log - .function_call_output_text(trimmed_call_id) - .is_some(), - "expected trimmed shell call to produce function_call_output before compaction" + user_messages + .iter() + .any(|message| message == second_user_message), + "expected compact request to retain the user boundary message" + ); + + assert!( + compact_request.has_function_call(retained_call_id) + && compact_request + .function_call_output_text(retained_call_id) + .is_some(), + "expected compact request to keep the older function call/result pair" + ); + assert!( + !compact_request.has_function_call(trimmed_call_id) + && compact_request + .function_call_output_text(trimmed_call_id) + .is_none(), + "expected compact request to drop the trailing function call/result pair past the boundary" + ); + + assert_eq!( + compact_request.inputs_of_type("function_call").len(), + 1, + "expected exactly one function call after trimming" + ); + assert_eq!( + compact_request.inputs_of_type("function_call_output").len(), + 1, + "expected exactly one function call output after trimming" + ); + + Ok(()) +} + +#[cfg_attr(target_os = "windows", ignore)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn auto_remote_compact_trims_function_call_history_to_fit_context_window() -> Result<()> { + skip_if_no_network!(Ok(())); + + let first_user_message = "turn with retained shell call"; + let second_user_message = "turn with trimmed shell call"; + let retained_call_id = "retained-call"; + let trimmed_call_id = "trimmed-call"; + let retained_command = "echo retained-shell-output"; + let trimmed_command = "yes x | head -n 3000"; + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.model_context_window = Some(2_000); + config.model_auto_compact_token_limit = Some(200_000); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + + responses::mount_sse_sequence( + harness.server(), + vec![ + sse(vec![ + responses::ev_shell_command_call(retained_call_id, retained_command), + responses::ev_completed_with_tokens("retained-call-response", 100), + ]), + sse(vec![ + responses::ev_assistant_message("retained-assistant", "retained complete"), + responses::ev_completed("retained-final-response"), + ]), + sse(vec![ + responses::ev_shell_command_call(trimmed_call_id, trimmed_command), + responses::ev_completed_with_tokens("trimmed-call-response", 100), + ]), + sse(vec![responses::ev_completed_with_tokens( + "trimmed-final-response", + 500_000, + )]), + sse(vec![ + responses::ev_assistant_message("post-compact-assistant", "post compact complete"), + responses::ev_completed("post-compact-final-response"), + ]), + ], + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: first_user_message.into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: second_user_message.into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + let compact_mock = + responses::mount_compact_json_once(harness.server(), serde_json::json!({ "output": [] })) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "turn that triggers auto compact".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + assert_eq!( + compact_mock.requests().len(), + 1, + "expected exactly one remote compact request" ); let compact_request = compact_mock.single_request(); @@ -356,6 +479,88 @@ async fn remote_compact_trims_function_call_history_to_fit_context_window() -> R Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn auto_remote_compact_failure_stops_agent_loop() -> Result<()> { + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.model_auto_compact_token_limit = Some(120); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + + mount_sse_once( + harness.server(), + sse(vec![ + responses::ev_assistant_message("initial-assistant", "initial turn complete"), + responses::ev_completed_with_tokens("initial-response", 500_000), + ]), + ) + .await; + + let compact_mock = responses::mount_compact_json_once( + harness.server(), + serde_json::json!({ "output": "invalid compact payload shape" }), + ) + .await; + let post_compact_turn_mock = mount_sse_once( + harness.server(), + sse(vec![ + responses::ev_assistant_message("post-compact-assistant", "should not run"), + responses::ev_completed("post-compact-response"), + ]), + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "turn that exceeds token threshold".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "turn that triggers auto compact".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await?; + + let error_message = wait_for_event_match(&codex, |event| match event { + EventMsg::Error(err) => Some(err.message.clone()), + _ => None, + }) + .await; + wait_for_event(&codex, |event| matches!(event, EventMsg::TurnComplete(_))).await; + + assert!( + error_message.contains("Error running remote compact task"), + "expected compact failure error, got {error_message}" + ); + assert_eq!( + compact_mock.requests().len(), + 1, + "expected exactly one remote compact attempt" + ); + assert!( + post_compact_turn_mock.requests().is_empty(), + "expected agent loop to stop after compaction failure" + ); + + Ok(()) +} + #[cfg_attr(target_os = "windows", ignore)] #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_compact_trim_estimate_uses_session_base_instructions() -> Result<()> { @@ -806,11 +1011,8 @@ async fn remote_compact_and_resume_refresh_stale_developer_instructions() -> Res let server = wiremock::MockServer::start().await; let stale_developer_message = "STALE_DEVELOPER_INSTRUCTIONS_SHOULD_BE_REMOVED"; - let mut start_builder = test_codex() - .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) - .with_config(|config| { - config.features.enable(Feature::RemoteCompaction); - }); + let mut start_builder = + test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()); let initial = start_builder.build(&server).await?; let home = initial.home.clone(); let rollout_path = initial @@ -900,11 +1102,8 @@ async fn remote_compact_and_resume_refresh_stale_developer_instructions() -> Res }) .await; - let mut resume_builder = test_codex() - .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) - .with_config(|config| { - config.features.enable(Feature::RemoteCompaction); - }); + let mut resume_builder = + test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()); let resumed = resume_builder.resume(&server, home, rollout_path).await?; resumed @@ -964,11 +1163,7 @@ async fn remote_compact_refreshes_stale_developer_instructions_without_resume() let server = wiremock::MockServer::start().await; let stale_developer_message = "STALE_DEVELOPER_INSTRUCTIONS_SHOULD_BE_REMOVED"; - let mut builder = test_codex() - .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) - .with_config(|config| { - config.features.enable(Feature::RemoteCompaction); - }); + let mut builder = test_codex().with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()); let test = builder.build(&server).await?; let responses_mock = responses::mount_sse_sequence(