Run remote auto compaction (#6879)

This commit is contained in:
pakrym-oai 2025-11-19 00:43:58 -08:00 committed by GitHub
parent 0440a3f105
commit 75f38f16dd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 124 additions and 31 deletions

View file

@ -7,6 +7,9 @@ use std::sync::atomic::AtomicU64;
use crate::AuthManager;
use crate::client_common::REVIEW_PROMPT;
use crate::compact;
use crate::compact::run_inline_auto_compact_task;
use crate::compact::should_use_remote_compact_task;
use crate::compact_remote::run_inline_remote_auto_compact_task;
use crate::features::Feature;
use crate::function_tool::FunctionCallError;
use crate::parse_command::parse_command;
@ -1890,7 +1893,12 @@ pub(crate) async fn run_task(
// as long as compaction works well in getting us way below the token limit, we shouldn't worry about being in an infinite loop.
if token_limit_reached {
compact::run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await;
if should_use_remote_compact_task(&sess).await {
run_inline_remote_auto_compact_task(sess.clone(), turn_context.clone())
.await;
} else {
run_inline_auto_compact_task(sess.clone(), turn_context.clone()).await;
}
continue;
}

View file

@ -7,6 +7,7 @@ use crate::codex::TurnContext;
use crate::codex::get_last_assistant_message_from_turn;
use crate::error::CodexErr;
use crate::error::Result as CodexResult;
use crate::features::Feature;
use crate::protocol::AgentMessageEvent;
use crate::protocol::CompactedItem;
use crate::protocol::ErrorEvent;
@ -18,6 +19,7 @@ use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::truncate_text;
use crate::util::backoff;
use codex_app_server_protocol::AuthMode;
use codex_protocol::items::TurnItem;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseInputItem;
@ -31,12 +33,22 @@ pub const SUMMARIZATION_PROMPT: &str = include_str!("../templates/compact/prompt
pub const SUMMARY_PREFIX: &str = include_str!("../templates/compact/summary_prefix.md");
const COMPACT_USER_MESSAGE_MAX_TOKENS: usize = 20_000;
pub(crate) async fn should_use_remote_compact_task(session: &Session) -> bool {
session
.services
.auth_manager
.auth()
.is_some_and(|auth| auth.mode == AuthMode::ChatGPT)
&& session.enabled(Feature::RemoteCompaction).await
}
pub(crate) async fn run_inline_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) {
let prompt = turn_context.compact_prompt().to_string();
let input = vec![UserInput::Text { text: prompt }];
run_compact_task_inner(sess, turn_context, input).await;
}
@ -44,13 +56,12 @@ pub(crate) async fn run_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
input: Vec<UserInput>,
) -> Option<String> {
) {
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
});
sess.send_event(&turn_context, start_event).await;
run_compact_task_inner(sess.clone(), turn_context, input).await;
None
}
async fn run_compact_task_inner(

View file

@ -12,34 +12,32 @@ use crate::protocol::RolloutItem;
use crate::protocol::TaskStartedEvent;
use codex_protocol::models::ResponseItem;
pub(crate) async fn run_remote_compact_task(
pub(crate) async fn run_inline_remote_auto_compact_task(
sess: Arc<Session>,
turn_context: Arc<TurnContext>,
) -> Option<String> {
) {
run_remote_compact_task_inner(&sess, &turn_context).await;
}
pub(crate) async fn run_remote_compact_task(sess: Arc<Session>, turn_context: Arc<TurnContext>) {
let start_event = EventMsg::TaskStarted(TaskStartedEvent {
model_context_window: turn_context.client.get_model_context_window(),
});
sess.send_event(&turn_context, start_event).await;
match run_remote_compact_task_inner(&sess, &turn_context).await {
Ok(()) => {
let event = EventMsg::AgentMessage(AgentMessageEvent {
message: "Compact task completed".to_string(),
});
sess.send_event(&turn_context, event).await;
}
Err(err) => {
let event = EventMsg::Error(ErrorEvent {
message: err.to_string(),
});
sess.send_event(&turn_context, event).await;
}
}
None
run_remote_compact_task_inner(&sess, &turn_context).await;
}
async fn run_remote_compact_task_inner(
async fn run_remote_compact_task_inner(sess: &Arc<Session>, turn_context: &Arc<TurnContext>) {
if let Err(err) = run_remote_compact_task_inner_impl(sess, turn_context).await {
let event = EventMsg::Error(ErrorEvent {
message: format!("Error running remote compact task: {err}"),
});
sess.send_event(turn_context, event).await;
}
}
async fn run_remote_compact_task_inner_impl(
sess: &Arc<Session>,
turn_context: &Arc<TurnContext>,
) -> CodexResult<()> {
@ -84,5 +82,11 @@ async fn run_remote_compact_task_inner(
};
sess.persist_rollout_items(&[RolloutItem::Compacted(compacted_item)])
.await;
let event = EventMsg::AgentMessage(AgentMessageEvent {
message: "Compact task completed".to_string(),
});
sess.send_event(turn_context, event).await;
Ok(())
}

View file

@ -3,10 +3,8 @@ use std::sync::Arc;
use super::SessionTask;
use super::SessionTaskContext;
use crate::codex::TurnContext;
use crate::features::Feature;
use crate::state::TaskKind;
use async_trait::async_trait;
use codex_app_server_protocol::AuthMode;
use codex_protocol::user_input::UserInput;
use tokio_util::sync::CancellationToken;
@ -27,16 +25,12 @@ impl SessionTask for CompactTask {
_cancellation_token: CancellationToken,
) -> Option<String> {
let session = session.clone_session();
if session
.services
.auth_manager
.auth()
.is_some_and(|auth| auth.mode == AuthMode::ChatGPT)
&& session.enabled(Feature::RemoteCompaction).await
{
if crate::compact::should_use_remote_compact_task(&session).await {
crate::compact_remote::run_remote_compact_task(session, ctx).await
} else {
crate::compact::run_compact_task(session, ctx, input).await
}
None
}
}

View file

@ -460,6 +460,13 @@ pub fn ev_apply_patch_function_call(call_id: &str, patch: &str) -> Value {
})
}
pub fn ev_shell_command_call(call_id: &str, command: &str) -> Value {
let args = serde_json::json!({ "command": command });
let arguments = serde_json::to_string(&args).expect("serialize shell arguments");
ev_function_call(call_id, "shell_command", &arguments)
}
pub fn ev_apply_patch_shell_call(call_id: &str, patch: &str) -> Value {
let args = serde_json::json!({ "command": ["apply_patch", patch] });
let arguments = serde_json::to_string(&args).expect("serialize apply_patch arguments");

View file

@ -13,10 +13,13 @@ use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::user_input::UserInput;
use core_test_support::responses;
use core_test_support::responses::mount_sse_once;
use core_test_support::responses::sse;
use core_test_support::skip_if_no_network;
use core_test_support::test_codex::TestCodexHarness;
use core_test_support::test_codex::test_codex;
use core_test_support::wait_for_event;
use core_test_support::wait_for_event_match;
use pretty_assertions::assert_eq;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
@ -125,6 +128,72 @@ async fn remote_compact_replaces_history_for_followups() -> Result<()> {
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_runs_automatically() -> Result<()> {
skip_if_no_network!(Ok(()));
let harness = TestCodexHarness::with_builder(
test_codex()
.with_auth(CodexAuth::create_dummy_chatgpt_auth_for_testing())
.with_config(|config| {
config.features.enable(Feature::RemoteCompaction);
}),
)
.await?;
let codex = harness.test().codex.clone();
mount_sse_once(
harness.server(),
sse(vec![
responses::ev_shell_command_call("m1", "echo 'hi'"),
responses::ev_completed_with_tokens("resp-1", 100000000), // over token limit
]),
)
.await;
let responses_mock = mount_sse_once(
harness.server(),
responses::sse(vec![
responses::ev_assistant_message("m2", "AFTER_COMPACT_REPLY"),
responses::ev_completed("resp-2"),
]),
)
.await;
let compacted_history = vec![ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "REMOTE_COMPACTED_SUMMARY".to_string(),
}],
}];
let compact_mock = responses::mount_compact_json_once(
harness.server(),
serde_json::json!({ "output": compacted_history.clone() }),
)
.await;
codex
.submit(Op::UserInput {
items: vec![UserInput::Text {
text: "hello remote compact".into(),
}],
})
.await?;
let message = wait_for_event_match(&codex, |ev| match ev {
EventMsg::AgentMessage(ev) => Some(ev.message.clone()),
_ => None,
})
.await;
wait_for_event(&codex, |ev| matches!(ev, EventMsg::TaskComplete(_))).await;
assert_eq!(message, "Compact task completed");
assert_eq!(compact_mock.requests().len(), 1);
let follow_up_body = responses_mock.single_request().body_json().to_string();
assert!(follow_up_body.contains("REMOTE_COMPACTED_SUMMARY"));
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn remote_compact_persists_replacement_history_in_rollout() -> Result<()> {
skip_if_no_network!(Ok(()));