From 38a47700b526a7ae3954d2dd8a233d166bc0a7b8 Mon Sep 17 00:00:00 2001 From: Ahmed Ibrahim Date: Tue, 3 Feb 2026 18:15:55 -0800 Subject: [PATCH] Add thread/compact v2 (#10445) - add `thread/compact` as a trigger-only v2 RPC that submits `Op::Compact` and returns `{}` immediately. - add v2 compaction e2e coverage for success and invalid/unknown thread ids, and update protocol schemas/docs. --- .../schema/json/ClientRequest.json | 35 +++++ .../codex_app_server_protocol.schemas.json | 42 ++++++ .../json/v2/ThreadCompactStartParams.json | 13 ++ .../json/v2/ThreadCompactStartResponse.json | 5 + .../schema/typescript/ClientRequest.ts | 3 +- .../typescript/v2/ThreadCompactStartParams.ts | 5 + .../v2/ThreadCompactStartResponse.ts | 5 + .../schema/typescript/v2/index.ts | 2 + .../src/protocol/common.rs | 4 + .../app-server-protocol/src/protocol/v2.rs | 12 ++ codex-rs/app-server/README.md | 17 +++ .../app-server/src/codex_message_processor.rs | 29 ++++ .../app-server/tests/common/mcp_process.rs | 10 ++ .../app-server/tests/suite/v2/compaction.rs | 132 ++++++++++++++++++ 14 files changed, 313 insertions(+), 1 deletion(-) create mode 100644 codex-rs/app-server-protocol/schema/json/v2/ThreadCompactStartParams.json create mode 100644 codex-rs/app-server-protocol/schema/json/v2/ThreadCompactStartResponse.json create mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/ThreadCompactStartParams.ts create mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/ThreadCompactStartResponse.ts diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 39faeb137..9b676f8e5 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -2235,6 +2235,17 @@ ], "type": "object" }, + "ThreadCompactStartParams": { + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "type": "object" + }, "ThreadForkParams": { "description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using path, the thread_id param will be ignored.\n\nPrefer using thread_id whenever possible.", "properties": { @@ -3210,6 +3221,30 @@ "title": "Thread/unarchiveRequest", "type": "object" }, + { + "properties": { + "id": { + "$ref": "#/definitions/RequestId" + }, + "method": { + "enum": [ + "thread/compact/start" + ], + "title": "Thread/compact/startRequestMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ThreadCompactStartParams" + } + }, + "required": [ + "id", + "method", + "params" + ], + "title": "Thread/compact/startRequest", + "type": "object" + }, { "properties": { "id": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index b813b5946..f0b16e235 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -598,6 +598,30 @@ "title": "Thread/unarchiveRequest", "type": "object" }, + { + "properties": { + "id": { + "$ref": "#/definitions/RequestId" + }, + "method": { + "enum": [ + "thread/compact/start" + ], + "title": "Thread/compact/startRequestMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/ThreadCompactStartParams" + } + }, + "required": [ + "id", + "method", + "params" + ], + "title": "Thread/compact/startRequest", + "type": "object" + }, { "properties": { "id": { @@ -13855,6 +13879,24 @@ "title": "ThreadArchiveResponse", "type": "object" }, + "ThreadCompactStartParams": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "title": "ThreadCompactStartParams", + "type": "object" + }, + "ThreadCompactStartResponse": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ThreadCompactStartResponse", + "type": "object" + }, "ThreadForkParams": { "$schema": "http://json-schema.org/draft-07/schema#", "description": "There are two ways to fork a thread: 1. By thread_id: load the thread from disk by thread_id and fork it into a new thread. 2. By path: load the thread from disk by path and fork it into a new thread.\n\nIf using path, the thread_id param will be ignored.\n\nPrefer using thread_id whenever possible.", diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadCompactStartParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadCompactStartParams.json new file mode 100644 index 000000000..a174ff95d --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadCompactStartParams.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "title": "ThreadCompactStartParams", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadCompactStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadCompactStartResponse.json new file mode 100644 index 000000000..bb372b6dd --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadCompactStartResponse.json @@ -0,0 +1,5 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "ThreadCompactStartResponse", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts b/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts index d526e5c5e..176f86be5 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts @@ -39,6 +39,7 @@ import type { SkillsListParams } from "./v2/SkillsListParams"; import type { SkillsRemoteReadParams } from "./v2/SkillsRemoteReadParams"; import type { SkillsRemoteWriteParams } from "./v2/SkillsRemoteWriteParams"; import type { ThreadArchiveParams } from "./v2/ThreadArchiveParams"; +import type { ThreadCompactStartParams } from "./v2/ThreadCompactStartParams"; import type { ThreadForkParams } from "./v2/ThreadForkParams"; import type { ThreadListParams } from "./v2/ThreadListParams"; import type { ThreadLoadedListParams } from "./v2/ThreadLoadedListParams"; @@ -54,4 +55,4 @@ import type { TurnStartParams } from "./v2/TurnStartParams"; /** * Request from the client to the server. */ -export type ClientRequest ={ "method": "initialize", id: RequestId, params: InitializeParams, } | { "method": "thread/start", id: RequestId, params: ThreadStartParams, } | { "method": "thread/resume", id: RequestId, params: ThreadResumeParams, } | { "method": "thread/fork", id: RequestId, params: ThreadForkParams, } | { "method": "thread/archive", id: RequestId, params: ThreadArchiveParams, } | { "method": "thread/name/set", id: RequestId, params: ThreadSetNameParams, } | { "method": "thread/unarchive", id: RequestId, params: ThreadUnarchiveParams, } | { "method": "thread/rollback", id: RequestId, params: ThreadRollbackParams, } | { "method": "thread/list", id: RequestId, params: ThreadListParams, } | { "method": "thread/loaded/list", id: RequestId, params: ThreadLoadedListParams, } | { "method": "thread/read", id: RequestId, params: ThreadReadParams, } | { "method": "skills/list", id: RequestId, params: SkillsListParams, } | { "method": "skills/remote/read", id: RequestId, params: SkillsRemoteReadParams, } | { "method": "skills/remote/write", id: RequestId, params: SkillsRemoteWriteParams, } | { "method": "app/list", id: RequestId, params: AppsListParams, } | { "method": "skills/config/write", id: RequestId, params: SkillsConfigWriteParams, } | { "method": "turn/start", id: RequestId, params: TurnStartParams, } | { "method": "turn/interrupt", id: RequestId, params: TurnInterruptParams, } | { "method": "review/start", id: RequestId, params: ReviewStartParams, } | { "method": "model/list", id: RequestId, params: ModelListParams, } | { "method": "mcpServer/oauth/login", id: RequestId, params: McpServerOauthLoginParams, } | { "method": "config/mcpServer/reload", id: RequestId, params: undefined, } | { "method": "mcpServerStatus/list", id: RequestId, params: ListMcpServerStatusParams, } | { "method": "account/login/start", id: RequestId, params: LoginAccountParams, } | { "method": "account/login/cancel", id: RequestId, params: CancelLoginAccountParams, } | { "method": "account/logout", id: RequestId, params: undefined, } | { "method": "account/rateLimits/read", id: RequestId, params: undefined, } | { "method": "feedback/upload", id: RequestId, params: FeedbackUploadParams, } | { "method": "command/exec", id: RequestId, params: CommandExecParams, } | { "method": "config/read", id: RequestId, params: ConfigReadParams, } | { "method": "config/value/write", id: RequestId, params: ConfigValueWriteParams, } | { "method": "config/batchWrite", id: RequestId, params: ConfigBatchWriteParams, } | { "method": "configRequirements/read", id: RequestId, params: undefined, } | { "method": "account/read", id: RequestId, params: GetAccountParams, } | { "method": "newConversation", id: RequestId, params: NewConversationParams, } | { "method": "getConversationSummary", id: RequestId, params: GetConversationSummaryParams, } | { "method": "listConversations", id: RequestId, params: ListConversationsParams, } | { "method": "resumeConversation", id: RequestId, params: ResumeConversationParams, } | { "method": "forkConversation", id: RequestId, params: ForkConversationParams, } | { "method": "archiveConversation", id: RequestId, params: ArchiveConversationParams, } | { "method": "sendUserMessage", id: RequestId, params: SendUserMessageParams, } | { "method": "sendUserTurn", id: RequestId, params: SendUserTurnParams, } | { "method": "interruptConversation", id: RequestId, params: InterruptConversationParams, } | { "method": "addConversationListener", id: RequestId, params: AddConversationListenerParams, } | { "method": "removeConversationListener", id: RequestId, params: RemoveConversationListenerParams, } | { "method": "gitDiffToRemote", id: RequestId, params: GitDiffToRemoteParams, } | { "method": "loginApiKey", id: RequestId, params: LoginApiKeyParams, } | { "method": "loginChatGpt", id: RequestId, params: undefined, } | { "method": "cancelLoginChatGpt", id: RequestId, params: CancelLoginChatGptParams, } | { "method": "logoutChatGpt", id: RequestId, params: undefined, } | { "method": "getAuthStatus", id: RequestId, params: GetAuthStatusParams, } | { "method": "getUserSavedConfig", id: RequestId, params: undefined, } | { "method": "setDefaultModel", id: RequestId, params: SetDefaultModelParams, } | { "method": "getUserAgent", id: RequestId, params: undefined, } | { "method": "userInfo", id: RequestId, params: undefined, } | { "method": "fuzzyFileSearch", id: RequestId, params: FuzzyFileSearchParams, } | { "method": "execOneOffCommand", id: RequestId, params: ExecOneOffCommandParams, }; +export type ClientRequest ={ "method": "initialize", id: RequestId, params: InitializeParams, } | { "method": "thread/start", id: RequestId, params: ThreadStartParams, } | { "method": "thread/resume", id: RequestId, params: ThreadResumeParams, } | { "method": "thread/fork", id: RequestId, params: ThreadForkParams, } | { "method": "thread/archive", id: RequestId, params: ThreadArchiveParams, } | { "method": "thread/name/set", id: RequestId, params: ThreadSetNameParams, } | { "method": "thread/unarchive", id: RequestId, params: ThreadUnarchiveParams, } | { "method": "thread/compact/start", id: RequestId, params: ThreadCompactStartParams, } | { "method": "thread/rollback", id: RequestId, params: ThreadRollbackParams, } | { "method": "thread/list", id: RequestId, params: ThreadListParams, } | { "method": "thread/loaded/list", id: RequestId, params: ThreadLoadedListParams, } | { "method": "thread/read", id: RequestId, params: ThreadReadParams, } | { "method": "skills/list", id: RequestId, params: SkillsListParams, } | { "method": "skills/remote/read", id: RequestId, params: SkillsRemoteReadParams, } | { "method": "skills/remote/write", id: RequestId, params: SkillsRemoteWriteParams, } | { "method": "app/list", id: RequestId, params: AppsListParams, } | { "method": "skills/config/write", id: RequestId, params: SkillsConfigWriteParams, } | { "method": "turn/start", id: RequestId, params: TurnStartParams, } | { "method": "turn/interrupt", id: RequestId, params: TurnInterruptParams, } | { "method": "review/start", id: RequestId, params: ReviewStartParams, } | { "method": "model/list", id: RequestId, params: ModelListParams, } | { "method": "mcpServer/oauth/login", id: RequestId, params: McpServerOauthLoginParams, } | { "method": "config/mcpServer/reload", id: RequestId, params: undefined, } | { "method": "mcpServerStatus/list", id: RequestId, params: ListMcpServerStatusParams, } | { "method": "account/login/start", id: RequestId, params: LoginAccountParams, } | { "method": "account/login/cancel", id: RequestId, params: CancelLoginAccountParams, } | { "method": "account/logout", id: RequestId, params: undefined, } | { "method": "account/rateLimits/read", id: RequestId, params: undefined, } | { "method": "feedback/upload", id: RequestId, params: FeedbackUploadParams, } | { "method": "command/exec", id: RequestId, params: CommandExecParams, } | { "method": "config/read", id: RequestId, params: ConfigReadParams, } | { "method": "config/value/write", id: RequestId, params: ConfigValueWriteParams, } | { "method": "config/batchWrite", id: RequestId, params: ConfigBatchWriteParams, } | { "method": "configRequirements/read", id: RequestId, params: undefined, } | { "method": "account/read", id: RequestId, params: GetAccountParams, } | { "method": "newConversation", id: RequestId, params: NewConversationParams, } | { "method": "getConversationSummary", id: RequestId, params: GetConversationSummaryParams, } | { "method": "listConversations", id: RequestId, params: ListConversationsParams, } | { "method": "resumeConversation", id: RequestId, params: ResumeConversationParams, } | { "method": "forkConversation", id: RequestId, params: ForkConversationParams, } | { "method": "archiveConversation", id: RequestId, params: ArchiveConversationParams, } | { "method": "sendUserMessage", id: RequestId, params: SendUserMessageParams, } | { "method": "sendUserTurn", id: RequestId, params: SendUserTurnParams, } | { "method": "interruptConversation", id: RequestId, params: InterruptConversationParams, } | { "method": "addConversationListener", id: RequestId, params: AddConversationListenerParams, } | { "method": "removeConversationListener", id: RequestId, params: RemoveConversationListenerParams, } | { "method": "gitDiffToRemote", id: RequestId, params: GitDiffToRemoteParams, } | { "method": "loginApiKey", id: RequestId, params: LoginApiKeyParams, } | { "method": "loginChatGpt", id: RequestId, params: undefined, } | { "method": "cancelLoginChatGpt", id: RequestId, params: CancelLoginChatGptParams, } | { "method": "logoutChatGpt", id: RequestId, params: undefined, } | { "method": "getAuthStatus", id: RequestId, params: GetAuthStatusParams, } | { "method": "getUserSavedConfig", id: RequestId, params: undefined, } | { "method": "setDefaultModel", id: RequestId, params: SetDefaultModelParams, } | { "method": "getUserAgent", id: RequestId, params: undefined, } | { "method": "userInfo", id: RequestId, params: undefined, } | { "method": "fuzzyFileSearch", id: RequestId, params: FuzzyFileSearchParams, } | { "method": "execOneOffCommand", id: RequestId, params: ExecOneOffCommandParams, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadCompactStartParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadCompactStartParams.ts new file mode 100644 index 000000000..a60b2c281 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadCompactStartParams.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadCompactStartParams = { threadId: string, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadCompactStartResponse.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadCompactStartResponse.ts new file mode 100644 index 000000000..3794feb27 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadCompactStartResponse.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadCompactStartResponse = Record; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index 83357ff5c..ed4f74d4a 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -128,6 +128,8 @@ export type { TextRange } from "./TextRange"; export type { Thread } from "./Thread"; export type { ThreadArchiveParams } from "./ThreadArchiveParams"; export type { ThreadArchiveResponse } from "./ThreadArchiveResponse"; +export type { ThreadCompactStartParams } from "./ThreadCompactStartParams"; +export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse"; export type { ThreadForkParams } from "./ThreadForkParams"; export type { ThreadForkResponse } from "./ThreadForkResponse"; export type { ThreadItem } from "./ThreadItem"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index bb2510684..c84d28ae8 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -208,6 +208,10 @@ client_request_definitions! { params: v2::ThreadUnarchiveParams, response: v2::ThreadUnarchiveResponse, }, + ThreadCompactStart => "thread/compact/start" { + params: v2::ThreadCompactStartParams, + response: v2::ThreadCompactStartResponse, + }, ThreadRollback => "thread/rollback" { params: v2::ThreadRollbackParams, response: v2::ThreadRollbackResponse, diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index dbc51fa9f..8e7beccf7 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1408,6 +1408,18 @@ pub struct ThreadUnarchiveResponse { pub thread: Thread, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadCompactStartParams { + pub thread_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadCompactStartResponse {} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 78a4f3cf9..055ca6f32 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -85,6 +85,7 @@ Example (from OpenAI's official VSCode extension): - `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success. - `thread/name/set` — set or update a thread’s user-facing name; returns `{}` on success. Thread names are not required to be unique; name lookups resolve to the most recently updated thread. - `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success. +- `thread/compact/start` — trigger conversation history compaction for a thread; returns `{}` immediately while progress streams through standard turn/item notifications. - `thread/rollback` — drop the last N turns from the agent’s in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success. - `turn/start` — add user input to a thread and begin Codex generation; responds with the initial `turn` object and streams `turn/started`, `item/*`, and `turn/completed` notifications. - `turn/interrupt` — request cancellation of an in-flight turn by `(thread_id, turn_id)`; success is an empty `{}` response and the turn finishes with `status: "interrupted"`. @@ -239,6 +240,22 @@ Use `thread/unarchive` to move an archived rollout back into the sessions direct { "id": 24, "result": { "thread": { "id": "thr_b" } } } ``` +### Example: Trigger thread compaction + +Use `thread/compact/start` to trigger manual history compaction for a thread. The request returns immediately with `{}`. + +Progress is emitted as standard `turn/*` and `item/*` notifications on the same `threadId`. Clients should expect a single compaction item: + +- `item/started` with `item: { "type": "contextCompaction", ... }` +- `item/completed` with the same `contextCompaction` item id + +While compaction is running, the thread is effectively in a turn so clients should surface progress UI based on the notifications. + +```json +{ "method": "thread/compact/start", "id": 25, "params": { "threadId": "thr_b" } } +{ "id": 25, "result": {} } +``` + ### Example: Start a turn (send user input) Turns attach user input (text or images) to a thread and trigger Codex generation. The `input` field is a list of discriminated unions: diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 866998a4f..219cb4657 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -104,6 +104,8 @@ use codex_app_server_protocol::SkillsRemoteWriteResponse; use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadArchiveParams; use codex_app_server_protocol::ThreadArchiveResponse; +use codex_app_server_protocol::ThreadCompactStartParams; +use codex_app_server_protocol::ThreadCompactStartResponse; use codex_app_server_protocol::ThreadForkParams; use codex_app_server_protocol::ThreadForkResponse; use codex_app_server_protocol::ThreadItem; @@ -455,6 +457,9 @@ impl CodexMessageProcessor { ClientRequest::ThreadUnarchive { request_id, params } => { self.thread_unarchive(request_id, params).await; } + ClientRequest::ThreadCompactStart { request_id, params } => { + self.thread_compact_start(request_id, params).await; + } ClientRequest::ThreadRollback { request_id, params } => { self.thread_rollback(request_id, params).await; } @@ -2094,6 +2099,30 @@ impl CodexMessageProcessor { } } + async fn thread_compact_start(&self, request_id: RequestId, params: ThreadCompactStartParams) { + let ThreadCompactStartParams { thread_id } = params; + + let (_, thread) = match self.load_thread(&thread_id).await { + Ok(v) => v, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + match thread.submit(Op::Compact).await { + Ok(_) => { + self.outgoing + .send_response(request_id, ThreadCompactStartResponse {}) + .await; + } + Err(err) => { + self.send_internal_error(request_id, format!("failed to start compaction: {err}")) + .await; + } + } + } + async fn thread_list(&self, request_id: RequestId, params: ThreadListParams) { let ThreadListParams { cursor, diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index 4804a8cd3..ba6dc058e 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -50,6 +50,7 @@ use codex_app_server_protocol::SendUserTurnParams; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::SetDefaultModelParams; use codex_app_server_protocol::ThreadArchiveParams; +use codex_app_server_protocol::ThreadCompactStartParams; use codex_app_server_protocol::ThreadForkParams; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadLoadedListParams; @@ -418,6 +419,15 @@ impl McpProcess { self.send_request("thread/unarchive", params).await } + /// Send a `thread/compact/start` JSON-RPC request. + pub async fn send_thread_compact_start_request( + &mut self, + params: ThreadCompactStartParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/compact/start", params).await + } + /// Send a `thread/rollback` JSON-RPC request. pub async fn send_thread_rollback_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/v2/compaction.rs b/codex-rs/app-server/tests/suite/v2/compaction.rs index eeedcb286..4730d920c 100644 --- a/codex-rs/app-server/tests/suite/v2/compaction.rs +++ b/codex-rs/app-server/tests/suite/v2/compaction.rs @@ -15,9 +15,12 @@ use app_test_support::write_chatgpt_auth; use app_test_support::write_mock_responses_config_toml; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ItemStartedNotification; +use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ThreadCompactStartParams; +use codex_app_server_protocol::ThreadCompactStartResponse; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; @@ -39,6 +42,7 @@ use tokio::time::timeout; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); const AUTO_COMPACT_LIMIT: i64 = 1_000; const COMPACT_PROMPT: &str = "Summarize the conversation."; +const INVALID_REQUEST_ERROR_CODE: i64 = -32600; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn auto_compaction_local_emits_started_and_completed_items() -> Result<()> { @@ -196,6 +200,134 @@ async fn auto_compaction_remote_emits_started_and_completed_items() -> Result<() Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn thread_compact_start_triggers_compaction_and_returns_empty_response() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let sse = responses::sse(vec![ + responses::ev_assistant_message("m1", "MANUAL_COMPACT_SUMMARY"), + responses::ev_completed_with_tokens("r1", 200), + ]); + responses::mount_sse_sequence(&server, vec![sse]).await; + + let codex_home = TempDir::new()?; + write_mock_responses_config_toml( + codex_home.path(), + &server.uri(), + &BTreeMap::default(), + AUTO_COMPACT_LIMIT, + None, + "mock_provider", + COMPACT_PROMPT, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_id = start_thread(&mut mcp).await?; + let compact_id = mcp + .send_thread_compact_start_request(ThreadCompactStartParams { + thread_id: thread_id.clone(), + }) + .await?; + let compact_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(compact_id)), + ) + .await??; + let _compact: ThreadCompactStartResponse = + to_response::(compact_resp)?; + + let started = wait_for_context_compaction_started(&mut mcp).await?; + let completed = wait_for_context_compaction_completed(&mut mcp).await?; + + let ThreadItem::ContextCompaction { id: started_id } = started.item else { + unreachable!("started item should be context compaction"); + }; + let ThreadItem::ContextCompaction { id: completed_id } = completed.item else { + unreachable!("completed item should be context compaction"); + }; + + assert_eq!(started.thread_id, thread_id); + assert_eq!(completed.thread_id, thread_id); + assert_eq!(started_id, completed_id); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn thread_compact_start_rejects_invalid_thread_id() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let codex_home = TempDir::new()?; + write_mock_responses_config_toml( + codex_home.path(), + &server.uri(), + &BTreeMap::default(), + AUTO_COMPACT_LIMIT, + None, + "mock_provider", + COMPACT_PROMPT, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_thread_compact_start_request(ThreadCompactStartParams { + thread_id: "not-a-thread-id".to_string(), + }) + .await?; + let error: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(request_id)), + ) + .await??; + + assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE); + assert!(error.error.message.contains("invalid thread id")); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn thread_compact_start_rejects_unknown_thread_id() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + let codex_home = TempDir::new()?; + write_mock_responses_config_toml( + codex_home.path(), + &server.uri(), + &BTreeMap::default(), + AUTO_COMPACT_LIMIT, + None, + "mock_provider", + COMPACT_PROMPT, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_thread_compact_start_request(ThreadCompactStartParams { + thread_id: "67e55044-10b1-426f-9247-bb680e5fe0c8".to_string(), + }) + .await?; + let error: JSONRPCError = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(request_id)), + ) + .await??; + + assert_eq!(error.error.code, INVALID_REQUEST_ERROR_CODE); + assert!(error.error.message.contains("thread not found")); + + Ok(()) +} + async fn start_thread(mcp: &mut McpProcess) -> Result { let thread_id = mcp .send_thread_start_request(ThreadStartParams {