From 75f38f16ddd1146c4e57ae796c5a415089ad86bc Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Wed, 19 Nov 2025 00:43:58 -0800 Subject: [PATCH] Run remote auto compaction (#6879) --- codex-rs/core/src/codex.rs | 10 ++- codex-rs/core/src/compact.rs | 15 ++++- codex-rs/core/src/compact_remote.rs | 42 +++++++------ codex-rs/core/src/tasks/compact.rs | 12 +--- codex-rs/core/tests/common/responses.rs | 7 +++ codex-rs/core/tests/suite/compact_remote.rs | 69 +++++++++++++++++++++ 6 files changed, 124 insertions(+), 31 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 7cf90ad1e..5156ed645 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -7,6 +7,9 @@ use std::sync::atomic::AtomicU64; use crate::AuthManager; use crate::client_common::REVIEW_PROMPT; use crate::compact; +use crate::compact::run_inline_auto_compact_task; +use crate::compact::should_use_remote_compact_task; +use crate::compact_remote::run_inline_remote_auto_compact_task; use crate::features::Feature; use crate::function_tool::FunctionCallError; use crate::parse_command::parse_command; @@ -1890,7 +1893,12 @@ pub(crate) async fn run_task( // as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop. if token_limit_reached { - compact::run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await; + if should_use_remote_compact_task(&sess).await { + run_inline_remote_auto_compact_task(sess.clone(), turn_context.clone()) + .await; + } else { + run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await; + } continue; } diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 33d38091f..b5ece9089 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -7,6 +7,7 @@ use crate::codex::TurnContext; use crate::codex::get_last_assistant_message_from_turn; use crate::error::CodexErr; use crate::error::Result as CodexResult; +use crate::features::Feature; use crate::protocol::AgentMessageEvent; use crate::protocol::CompactedItem; use crate::protocol::ErrorEvent; @@ -18,6 +19,7 @@ use crate::truncate::TruncationPolicy; use crate::truncate::approx_token_count; use crate::truncate::truncate_text; use crate::util::backoff; +use codex_app_server_protocol::AuthMode; use codex_protocol::items::TurnItem; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseInputItem; @@ -31,12 +33,22 @@ pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md"); const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000; +pub(crate) async fn should_use_remote_compact_task(session: &Session) -> bool { + session + .services + .auth_manager + .auth() + .is_some_and(|auth| auth.mode == AuthMode::ChatGPT) + && session.enabled(Feature::RemoteCompaction).await +} + pub(crate) async fn run_inline_auto_compact_task( sess: Arc, turn_context: Arc, ) { let prompt = turn_context.compact_prompt().to_string(); let input = vec![UserInput::Text { text: prompt }]; + run_compact_task_inner(sess, turn_context, input).await; } @@ -44,13 +56,12 @@ pub(crate) async fn run_compact_task( sess: Arc, turn_context: Arc, input: Vec, -) -> Option { +) { let start_event = EventMsg::TaskStarted(TaskStartedEvent { model_context_window: turn_context.client.get_model_context_window(), }); sess.send_event(&turn_context, start_event).await; run_compact_task_inner(sess.clone(), turn_context, input).await; - None } async fn run_compact_task_inner( diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 75b81d408..51c35baf3 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -12,34 +12,32 @@ use crate::protocol::RolloutItem; use crate::protocol::TaskStartedEvent; use codex_protocol::models::ResponseItem; -pub(crate) async fn run_remote_compact_task( +pub(crate) async fn run_inline_remote_auto_compact_task( sess: Arc, turn_context: Arc, -) -> Option { +) { + run_remote_compact_task_inner(&sess, &turn_context).await; +} + +pub(crate) async fn run_remote_compact_task(sess: Arc, turn_context: Arc) { let start_event = EventMsg::TaskStarted(TaskStartedEvent { model_context_window: turn_context.client.get_model_context_window(), }); sess.send_event(&turn_context, start_event).await; - match run_remote_compact_task_inner(&sess, &turn_context).await { - Ok(()) => { - let event = EventMsg::AgentMessage(AgentMessageEvent { - message: "Compact task completed".to_string(), - }); - sess.send_event(&turn_context, event).await; - } - Err(err) => { - let event = EventMsg::Error(ErrorEvent { - message: err.to_string(), - }); - sess.send_event(&turn_context, event).await; - } - } - - None + run_remote_compact_task_inner(&sess, &turn_context).await; } -async fn run_remote_compact_task_inner( +async fn run_remote_compact_task_inner(sess: &Arc, turn_context: &Arc) { + if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await { + let event = EventMsg::Error(ErrorEvent { + message: format!("Error running remote compact task: {err}"), + }); + sess.send_event(turn_context, event).await; + } +} + +async fn run_remote_compact_task_inner_impl( sess: &Arc, turn_context: &Arc, ) -> CodexResult<()> { @@ -84,5 +82,11 @@ async fn run_remote_compact_task_inner( }; sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)]) .await; + + let event = EventMsg::AgentMessage(AgentMessageEvent { + message: "Compact task completed".to_string(), + }); + sess.send_event(turn_context, event).await; + Ok(()) } diff --git a/codex-rs/core/src/tasks/compact.rs b/codex-rs/core/src/tasks/compact.rs index 4f161267a..893c0c476 100644 --- a/codex-rs/core/src/tasks/compact.rs +++ b/codex-rs/core/src/tasks/compact.rs @@ -3,10 +3,8 @@ use std::sync::Arc; use super::SessionTask; use super::SessionTaskContext; use crate::codex::TurnContext; -use crate::features::Feature; use crate::state::TaskKind; use async_trait::async_trait; -use codex_app_server_protocol::AuthMode; use codex_protocol::user_input::UserInput; use tokio_util::sync::CancellationToken; @@ -27,16 +25,12 @@ impl SessionTask for CompactTask { _cancellation_token: CancellationToken, ) -> Option { let session = session.clone_session(); - if session - .services - .auth_manager - .auth() - .is_some_and(|auth| auth.mode == AuthMode::ChatGPT) - && session.enabled(Feature::RemoteCompaction).await - { + if crate::compact::should_use_remote_compact_task(&session).await { crate::compact_remote::run_remote_compact_task(session, ctx).await } else { crate::compact::run_compact_task(session, ctx, input).await } + + None } } diff --git a/codex-rs/core/tests/common/responses.rs b/codex-rs/core/tests/common/responses.rs index 3ebb28355..7c5a103f4 100644 --- a/codex-rs/core/tests/common/responses.rs +++ b/codex-rs/core/tests/common/responses.rs @@ -460,6 +460,13 @@ pub fn ev_apply_patch_function_call(call_id: &str, patch: &str) -> Value { }) } +pub fn ev_shell_command_call(call_id: &str, command: &str) -> Value { + let args = serde_json::json!({ "command": command }); + let arguments = serde_json::to_string(&args).expect("serialize shell arguments"); + + ev_function_call(call_id, "shell_command", &arguments) +} + pub fn ev_apply_patch_shell_call(call_id: &str, patch: &str) -> Value { let args = serde_json::json!({ "command": ["apply_patch", patch] }); let arguments = serde_json::to_string(&args).expect("serialize apply_patch arguments"); diff --git a/codex-rs/core/tests/suite/compact_remote.rs b/codex-rs/core/tests/suite/compact_remote.rs index 4bc1af9e1..dc88bc574 100644 --- a/codex-rs/core/tests/suite/compact_remote.rs +++ b/codex-rs/core/tests/suite/compact_remote.rs @@ -13,10 +13,13 @@ use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; use core_test_support::responses; +use core_test_support::responses::mount_sse_once; +use core_test_support::responses::sse; use core_test_support::skip_if_no_network; use core_test_support::test_codex::TestCodexHarness; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; +use core_test_support::wait_for_event_match; use pretty_assertions::assert_eq; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -125,6 +128,72 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn remote_compact_runs_automatically() -> Result<()> { + skip_if_no_network!(Ok(())); + + let harness = TestCodexHarness::with_builder( + test_codex() + .with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing()) + .with_config(|config| { + config.features.enable(Feature::RemoteCompaction); + }), + ) + .await?; + let codex = harness.test().codex.clone(); + + mount_sse_once( + harness.server(), + sse(vec![ + responses::ev_shell_command_call("m1", "echo 'hi'"), + responses::ev_completed_with_tokens("resp-1", 100000000), // over token limit + ]), + ) + .await; + let responses_mock = mount_sse_once( + harness.server(), + responses::sse(vec![ + responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"), + responses::ev_completed("resp-2"), + ]), + ) + .await; + + let compacted_history = vec![ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "REMOTE_COMPACTED_SUMMARY".to_string(), + }], + }]; + let compact_mock = responses::mount_compact_json_once( + harness.server(), + serde_json::json!({ "output": compacted_history.clone() }), + ) + .await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "hello remote compact".into(), + }], + }) + .await?; + let message = wait_for_event_match(&codex, |ev| match ev { + EventMsg::AgentMessage(ev) => Some(ev.message.clone()), + _ => None, + }) + .await; + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await; + + assert_eq!(message, "Compact task completed"); + assert_eq!(compact_mock.requests().len(), 1); + let follow_up_body = responses_mock.single_request().body_json().to_string(); + assert!(follow_up_body.contains("REMOTE_COMPACTED_SUMMARY")); + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> { skip_if_no_network!(Ok(()));