app-server: Emit thread archive/unarchive notifications (#12030)

* Add v2 server notifications `thread/archived` and `thread/unarchived`
with a `threadId` payload.
* Wire new events into `thread/archive` and `thread/unarchive` success
paths.
* Update app-server protocol/schema/docs accordingly.

Testing:
- Updated archive/unarchive end-to-end tests to verify both
notifications are emitted with the expected thread id payload.
This commit is contained in:
Ruslan Nigmatullin 2026-02-17 14:53:58 -08:00 committed by GitHub
parent 709e2133bb
commit 31cbebd3c2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 234 additions and 8 deletions

View file

@ -4756,4 +4756,4 @@
}
],
"title": "ClientRequest"
}
}

View file

@ -6539,6 +6539,17 @@
],
"type": "object"
},
"ThreadArchivedNotification": {
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"type": "object"
},
"ThreadId": {
"type": "string"
},
@ -7083,6 +7094,17 @@
],
"type": "object"
},
"ThreadUnarchivedNotification": {
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"type": "object"
},
"TokenUsage": {
"properties": {
"cached_input_tokens": {
@ -8076,6 +8098,46 @@
"title": "Thread/startedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/archived"
],
"title": "Thread/archivedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadArchivedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/archivedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/unarchived"
],
"title": "Thread/unarchivedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/ThreadUnarchivedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/unarchivedNotification",
"type": "object"
},
{
"properties": {
"method": {

View file

@ -8063,6 +8063,46 @@
"title": "Thread/startedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/archived"
],
"title": "Thread/archivedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/ThreadArchivedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/archivedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"thread/unarchived"
],
"title": "Thread/unarchivedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/ThreadUnarchivedNotification"
}
},
"required": [
"method",
"params"
],
"title": "Thread/unarchivedNotification",
"type": "object"
},
{
"properties": {
"method": {
@ -15107,6 +15147,19 @@
"title": "ThreadArchiveResponse",
"type": "object"
},
"ThreadArchivedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadArchivedNotification",
"type": "object"
},
"ThreadCompactStartParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
@ -16315,6 +16368,19 @@
"title": "ThreadUnarchiveResponse",
"type": "object"
},
"ThreadUnarchivedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadUnarchivedNotification",
"type": "object"
},
"TokenUsageBreakdown": {
"properties": {
"cachedInputTokens": {
@ -17020,4 +17086,4 @@
},
"title": "CodexAppServerProtocol",
"type": "object"
}
}

View file

@ -44,4 +44,4 @@
},
"title": "SkillsRemoteReadParams",
"type": "object"
}
}

View file

@ -14,4 +14,4 @@
],
"title": "SkillsRemoteWriteResponse",
"type": "object"
}
}

View file

@ -0,0 +1,13 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadArchivedNotification",
"type": "object"
}

View file

@ -0,0 +1,13 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"threadId": {
"type": "string"
}
},
"required": [
"threadId"
],
"title": "ThreadUnarchivedNotification",
"type": "object"
}

View file

@ -28,9 +28,11 @@ import type { ReasoningSummaryPartAddedNotification } from "./v2/ReasoningSummar
import type { ReasoningSummaryTextDeltaNotification } from "./v2/ReasoningSummaryTextDeltaNotification";
import type { ReasoningTextDeltaNotification } from "./v2/ReasoningTextDeltaNotification";
import type { TerminalInteractionNotification } from "./v2/TerminalInteractionNotification";
import type { ThreadArchivedNotification } from "./v2/ThreadArchivedNotification";
import type { ThreadNameUpdatedNotification } from "./v2/ThreadNameUpdatedNotification";
import type { ThreadStartedNotification } from "./v2/ThreadStartedNotification";
import type { ThreadTokenUsageUpdatedNotification } from "./v2/ThreadTokenUsageUpdatedNotification";
import type { ThreadUnarchivedNotification } from "./v2/ThreadUnarchivedNotification";
import type { TurnCompletedNotification } from "./v2/TurnCompletedNotification";
import type { TurnDiffUpdatedNotification } from "./v2/TurnDiffUpdatedNotification";
import type { TurnPlanUpdatedNotification } from "./v2/TurnPlanUpdatedNotification";
@ -40,4 +42,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW
/**
* Notification sent from the server to the client.
*/
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };

View file

@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadArchivedNotification = { threadId: string, };

View file

@ -0,0 +1,5 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ThreadUnarchivedNotification = { threadId: string, };

View file

@ -147,6 +147,7 @@ export type { TextRange } from "./TextRange";
export type { Thread } from "./Thread";
export type { ThreadArchiveParams } from "./ThreadArchiveParams";
export type { ThreadArchiveResponse } from "./ThreadArchiveResponse";
export type { ThreadArchivedNotification } from "./ThreadArchivedNotification";
export type { ThreadCompactStartParams } from "./ThreadCompactStartParams";
export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse";
export type { ThreadForkParams } from "./ThreadForkParams";
@ -174,6 +175,7 @@ export type { ThreadTokenUsage } from "./ThreadTokenUsage";
export type { ThreadTokenUsageUpdatedNotification } from "./ThreadTokenUsageUpdatedNotification";
export type { ThreadUnarchiveParams } from "./ThreadUnarchiveParams";
export type { ThreadUnarchiveResponse } from "./ThreadUnarchiveResponse";
export type { ThreadUnarchivedNotification } from "./ThreadUnarchivedNotification";
export type { TokenUsageBreakdown } from "./TokenUsageBreakdown";
export type { ToolRequestUserInputAnswer } from "./ToolRequestUserInputAnswer";
export type { ToolRequestUserInputOption } from "./ToolRequestUserInputOption";

View file

@ -769,6 +769,8 @@ server_notification_definitions! {
/// NEW NOTIFICATIONS
Error => "error" (v2::ErrorNotification),
ThreadStarted => "thread/started" (v2::ThreadStartedNotification),
ThreadArchived => "thread/archived" (v2::ThreadArchivedNotification),
ThreadUnarchived => "thread/unarchived" (v2::ThreadUnarchivedNotification),
ThreadNameUpdated => "thread/name/updated" (v2::ThreadNameUpdatedNotification),
ThreadTokenUsageUpdated => "thread/tokenUsage/updated" (v2::ThreadTokenUsageUpdatedNotification),
TurnStarted => "turn/started" (v2::TurnStartedNotification),

View file

@ -2913,6 +2913,20 @@ pub struct ThreadStartedNotification {
pub thread: Thread,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadArchivedNotification {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
pub struct ThreadUnarchivedNotification {
pub thread_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View file

@ -120,9 +120,9 @@ Example with notification opt-out:
- `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, and `cwd` filters.
- `thread/loaded/list` — list the thread ids currently loaded in memory.
- `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`.
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success.
- `thread/archive` — move a threads rollout file into the archived directory; returns `{}` on success and emits `thread/archived`.
- `thread/name/set` — set or update a threads user-facing name; returns `{}` on success. Thread names are not required to be unique; name lookups resolve to the most recently updated thread.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success.
- `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`.
- `thread/compact/start` — trigger conversation history compaction for a thread; returns `{}` immediately while progress streams through standard turn/item notifications.
- `thread/backgroundTerminals/clean` — terminate all running background terminals for a thread (experimental; requires `capabilities.experimentalApi`); returns `{}` when the cleanup request is accepted.
- `thread/rollback` — drop the last N turns from the agents in-memory context and persist a rollback marker in the rollout so future resumes see the pruned history; returns the updated `thread` (with `turns` populated) on success.
@ -274,6 +274,7 @@ Use `thread/archive` to move the persisted rollout (stored as a JSONL file on di
```json
{ "method": "thread/archive", "id": 21, "params": { "threadId": "thr_b" } }
{ "id": 21, "result": {} }
{ "method": "thread/archived", "params": { "threadId": "thr_b" } }
```
An archived thread will not appear in `thread/list` unless `archived` is set to `true`.
@ -285,6 +286,7 @@ Use `thread/unarchive` to move an archived rollout back into the sessions direct
```json
{ "method": "thread/unarchive", "id": 24, "params": { "threadId": "thr_b" } }
{ "id": 24, "result": { "thread": { "id": "thr_b" } } }
{ "method": "thread/unarchived", "params": { "threadId": "thr_b" } }
```
### Example: Trigger thread compaction
@ -519,7 +521,7 @@ Notes:
## Events
Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `turn/*`, and `item/*` notifications.
Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `thread/archived`, `thread/unarchived`, `turn/*`, and `item/*` notifications.
### Notification opt-out

View file

@ -122,6 +122,7 @@ use codex_app_server_protocol::SkillsRemoteWriteResponse;
use codex_app_server_protocol::Thread;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadArchivedNotification;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanParams;
use codex_app_server_protocol::ThreadBackgroundTerminalsCleanResponse;
use codex_app_server_protocol::ThreadCompactStartParams;
@ -147,6 +148,7 @@ use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadStartedNotification;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::ThreadUnarchivedNotification;
use codex_app_server_protocol::Turn;
use codex_app_server_protocol::TurnInterruptParams;
use codex_app_server_protocol::TurnStartParams;
@ -2126,10 +2128,17 @@ impl CodexMessageProcessor {
}
};
let thread_id_str = thread_id.to_string();
match self.archive_thread_common(thread_id, &rollout_path).await {
Ok(()) => {
let response = ThreadArchiveResponse {};
self.outgoing.send_response(request_id, response).await;
let notification = ThreadArchivedNotification {
thread_id: thread_id_str,
};
self.outgoing
.send_server_notification(ServerNotification::ThreadArchived(notification))
.await;
}
Err(err) => {
self.outgoing.send_error(request_id, err).await;
@ -2335,8 +2344,13 @@ impl CodexMessageProcessor {
match result {
Ok(thread) => {
let thread_id = thread.id.clone();
let response = ThreadUnarchiveResponse { thread };
self.outgoing.send_response(request_id, response).await;
let notification = ThreadUnarchivedNotification { thread_id };
self.outgoing
.send_server_notification(ServerNotification::ThreadUnarchived(notification))
.await;
}
Err(err) => {
self.outgoing.send_error(request_id, err).await;

View file

@ -7,6 +7,7 @@ use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::ThreadArchiveResponse;
use codex_app_server_protocol::ThreadArchivedNotification;
use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::TurnStartParams;
@ -14,6 +15,7 @@ use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput;
use codex_core::ARCHIVED_SESSIONS_SUBDIR;
use codex_core::find_thread_path_by_id_str;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
@ -122,6 +124,17 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> {
)
.await??;
let _: ThreadArchiveResponse = to_response::<ThreadArchiveResponse>(archive_resp)?;
let archive_notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/archived"),
)
.await??;
let archived_notification: ThreadArchivedNotification = serde_json::from_value(
archive_notification
.params
.expect("thread/archived notification params"),
)?;
assert_eq!(archived_notification.thread_id, thread.id);
// Verify file moved.
let archived_directory = codex_home.path().join(ARCHIVED_SESSIONS_SUBDIR);

View file

@ -10,11 +10,13 @@ use codex_app_server_protocol::ThreadStartParams;
use codex_app_server_protocol::ThreadStartResponse;
use codex_app_server_protocol::ThreadUnarchiveParams;
use codex_app_server_protocol::ThreadUnarchiveResponse;
use codex_app_server_protocol::ThreadUnarchivedNotification;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnStartResponse;
use codex_app_server_protocol::UserInput;
use codex_core::find_archived_thread_path_by_id_str;
use codex_core::find_thread_path_by_id_str;
use pretty_assertions::assert_eq;
use std::fs::FileTimes;
use std::fs::OpenOptions;
use std::path::Path;
@ -120,6 +122,17 @@ async fn thread_unarchive_moves_rollout_back_into_sessions_directory() -> Result
let ThreadUnarchiveResponse {
thread: unarchived_thread,
} = to_response::<ThreadUnarchiveResponse>(unarchive_resp)?;
let unarchive_notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message("thread/unarchived"),
)
.await??;
let unarchived_notification: ThreadUnarchivedNotification = serde_json::from_value(
unarchive_notification
.params
.expect("thread/unarchived notification params"),
)?;
assert_eq!(unarchived_notification.thread_id, thread.id);
assert!(
unarchived_thread.updated_at > old_timestamp,
"expected updated_at to be bumped on unarchive"