diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index b5b85d88a..5ab197c84 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -2314,6 +2314,17 @@ ], "type": "object" }, + "ThreadUnsubscribeParams": { + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "type": "object" + }, "TurnInterruptParams": { "properties": { "threadId": { @@ -2805,6 +2816,30 @@ "title": "Thread/archiveRequest", "type": "object" }, + { + "properties": { + "id": { + "$ref": "#/definitions/RequestId" + }, + "method": { + "enum": [ + "thread/unsubscribe" + ], + "title": "Thread/unsubscribeRequestMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ThreadUnsubscribeParams" + } + }, + "required": [ + "id", + "method", + "params" + ], + "title": "Thread/unsubscribeRequest", + "type": "object" + }, { "properties": { "id": { diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 4de83a9ff..eca77d2bf 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -1726,6 +1726,17 @@ ], "type": "object" }, + "ThreadClosedNotification": { + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "type": "object" + }, "ThreadId": { "type": "string" }, @@ -3130,6 +3141,26 @@ "title": "Thread/unarchivedNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "thread/closed" + ], + "title": "Thread/closedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/ThreadClosedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Thread/closedNotification", + "type": "object" + }, { "properties": { "method": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 3ea58f1a2..8c7db1f97 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -475,6 +475,30 @@ "title": "Thread/archiveRequest", "type": "object" }, + { + "properties": { + "id": { + "$ref": "#/definitions/RequestId" + }, + "method": { + "enum": [ + "thread/unsubscribe" + ], + "title": "Thread/unsubscribeRequestMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/ThreadUnsubscribeParams" + } + }, + "required": [ + "id", + "method", + "params" + ], + "title": "Thread/unsubscribeRequest", + "type": "object" + }, { "properties": { "id": { @@ -5903,6 +5927,26 @@ "title": "Thread/unarchivedNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "thread/closed" + ], + "title": "Thread/closedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/ThreadClosedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "Thread/closedNotification", + "type": "object" + }, { "properties": { "method": { @@ -12543,6 +12587,19 @@ "title": "ThreadArchivedNotification", "type": "object" }, + "ThreadClosedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "title": "ThreadClosedNotification", + "type": "object" + }, "ThreadCompactStartParams": { "$schema": "http://json-schema.org/draft-07/schema#", "properties": { @@ -14057,6 +14114,40 @@ "title": "ThreadUnarchivedNotification", "type": "object" }, + "ThreadUnsubscribeParams": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "title": "ThreadUnsubscribeParams", + "type": "object" + }, + "ThreadUnsubscribeResponse": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "status": { + "$ref": "#/definitions/v2/ThreadUnsubscribeStatus" + } + }, + "required": [ + "status" + ], + "title": "ThreadUnsubscribeResponse", + "type": "object" + }, + "ThreadUnsubscribeStatus": { + "enum": [ + "notLoaded", + "notSubscribed", + "unsubscribed" + ], + "type": "string" + }, "TokenUsageBreakdown": { "properties": { "cachedInputTokens": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadClosedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadClosedNotification.json new file mode 100644 index 000000000..13e7f5774 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadClosedNotification.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "title": "ThreadClosedNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnsubscribeParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnsubscribeParams.json new file mode 100644 index 000000000..dec3670cc --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnsubscribeParams.json @@ -0,0 +1,13 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "threadId": { + "type": "string" + } + }, + "required": [ + "threadId" + ], + "title": "ThreadUnsubscribeParams", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnsubscribeResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnsubscribeResponse.json new file mode 100644 index 000000000..2e545dbf9 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnsubscribeResponse.json @@ -0,0 +1,23 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "ThreadUnsubscribeStatus": { + "enum": [ + "notLoaded", + "notSubscribed", + "unsubscribed" + ], + "type": "string" + } + }, + "properties": { + "status": { + "$ref": "#/definitions/ThreadUnsubscribeStatus" + } + }, + "required": [ + "status" + ], + "title": "ThreadUnsubscribeResponse", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts b/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts index 4e8069046..970764400 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ClientRequest.ts @@ -52,6 +52,7 @@ import type { ThreadRollbackParams } from "./v2/ThreadRollbackParams"; import type { ThreadSetNameParams } from "./v2/ThreadSetNameParams"; import type { ThreadStartParams } from "./v2/ThreadStartParams"; import type { ThreadUnarchiveParams } from "./v2/ThreadUnarchiveParams"; +import type { ThreadUnsubscribeParams } from "./v2/ThreadUnsubscribeParams"; import type { TurnInterruptParams } from "./v2/TurnInterruptParams"; import type { TurnStartParams } from "./v2/TurnStartParams"; import type { TurnSteerParams } from "./v2/TurnSteerParams"; @@ -60,4 +61,4 @@ import type { WindowsSandboxSetupStartParams } from "./v2/WindowsSandboxSetupSta /** * Request from the client to the server. */ -export type ClientRequest ={ "method": "initialize", id: RequestId, params: InitializeParams, } | { "method": "thread/start", id: RequestId, params: ThreadStartParams, } | { "method": "thread/resume", id: RequestId, params: ThreadResumeParams, } | { "method": "thread/fork", id: RequestId, params: ThreadForkParams, } | { "method": "thread/archive", id: RequestId, params: ThreadArchiveParams, } | { "method": "thread/name/set", id: RequestId, params: ThreadSetNameParams, } | { "method": "thread/unarchive", id: RequestId, params: ThreadUnarchiveParams, } | { "method": "thread/compact/start", id: RequestId, params: ThreadCompactStartParams, } | { "method": "thread/rollback", id: RequestId, params: ThreadRollbackParams, } | { "method": "thread/list", id: RequestId, params: ThreadListParams, } | { "method": "thread/loaded/list", id: RequestId, params: ThreadLoadedListParams, } | { "method": "thread/read", id: RequestId, params: ThreadReadParams, } | { "method": "skills/list", id: RequestId, params: SkillsListParams, } | { "method": "skills/remote/list", id: RequestId, params: SkillsRemoteReadParams, } | { "method": "skills/remote/export", id: RequestId, params: SkillsRemoteWriteParams, } | { "method": "app/list", id: RequestId, params: AppsListParams, } | { "method": "skills/config/write", id: RequestId, params: SkillsConfigWriteParams, } | { "method": "turn/start", id: RequestId, params: TurnStartParams, } | { "method": "turn/steer", id: RequestId, params: TurnSteerParams, } | { "method": "turn/interrupt", id: RequestId, params: TurnInterruptParams, } | { "method": "review/start", id: RequestId, params: ReviewStartParams, } | { "method": "model/list", id: RequestId, params: ModelListParams, } | { "method": "experimentalFeature/list", id: RequestId, params: ExperimentalFeatureListParams, } | { "method": "mcpServer/oauth/login", id: RequestId, params: McpServerOauthLoginParams, } | { "method": "config/mcpServer/reload", id: RequestId, params: undefined, } | { "method": "mcpServerStatus/list", id: RequestId, params: ListMcpServerStatusParams, } | { "method": "windowsSandbox/setupStart", id: RequestId, params: WindowsSandboxSetupStartParams, } | { "method": "account/login/start", id: RequestId, params: LoginAccountParams, } | { "method": "account/login/cancel", id: RequestId, params: CancelLoginAccountParams, } | { "method": "account/logout", id: RequestId, params: undefined, } | { "method": "account/rateLimits/read", id: RequestId, params: undefined, } | { "method": "feedback/upload", id: RequestId, params: FeedbackUploadParams, } | { "method": "command/exec", id: RequestId, params: CommandExecParams, } | { "method": "config/read", id: RequestId, params: ConfigReadParams, } | { "method": "externalAgentConfig/detect", id: RequestId, params: ExternalAgentConfigDetectParams, } | { "method": "externalAgentConfig/import", id: RequestId, params: ExternalAgentConfigImportParams, } | { "method": "config/value/write", id: RequestId, params: ConfigValueWriteParams, } | { "method": "config/batchWrite", id: RequestId, params: ConfigBatchWriteParams, } | { "method": "configRequirements/read", id: RequestId, params: undefined, } | { "method": "account/read", id: RequestId, params: GetAccountParams, } | { "method": "newConversation", id: RequestId, params: NewConversationParams, } | { "method": "getConversationSummary", id: RequestId, params: GetConversationSummaryParams, } | { "method": "listConversations", id: RequestId, params: ListConversationsParams, } | { "method": "resumeConversation", id: RequestId, params: ResumeConversationParams, } | { "method": "forkConversation", id: RequestId, params: ForkConversationParams, } | { "method": "archiveConversation", id: RequestId, params: ArchiveConversationParams, } | { "method": "sendUserMessage", id: RequestId, params: SendUserMessageParams, } | { "method": "sendUserTurn", id: RequestId, params: SendUserTurnParams, } | { "method": "interruptConversation", id: RequestId, params: InterruptConversationParams, } | { "method": "addConversationListener", id: RequestId, params: AddConversationListenerParams, } | { "method": "removeConversationListener", id: RequestId, params: RemoveConversationListenerParams, } | { "method": "gitDiffToRemote", id: RequestId, params: GitDiffToRemoteParams, } | { "method": "loginApiKey", id: RequestId, params: LoginApiKeyParams, } | { "method": "loginChatGpt", id: RequestId, params: undefined, } | { "method": "cancelLoginChatGpt", id: RequestId, params: CancelLoginChatGptParams, } | { "method": "logoutChatGpt", id: RequestId, params: undefined, } | { "method": "getAuthStatus", id: RequestId, params: GetAuthStatusParams, } | { "method": "getUserSavedConfig", id: RequestId, params: undefined, } | { "method": "setDefaultModel", id: RequestId, params: SetDefaultModelParams, } | { "method": "getUserAgent", id: RequestId, params: undefined, } | { "method": "userInfo", id: RequestId, params: undefined, } | { "method": "fuzzyFileSearch", id: RequestId, params: FuzzyFileSearchParams, } | { "method": "execOneOffCommand", id: RequestId, params: ExecOneOffCommandParams, }; +export type ClientRequest ={ "method": "initialize", id: RequestId, params: InitializeParams, } | { "method": "thread/start", id: RequestId, params: ThreadStartParams, } | { "method": "thread/resume", id: RequestId, params: ThreadResumeParams, } | { "method": "thread/fork", id: RequestId, params: ThreadForkParams, } | { "method": "thread/archive", id: RequestId, params: ThreadArchiveParams, } | { "method": "thread/unsubscribe", id: RequestId, params: ThreadUnsubscribeParams, } | { "method": "thread/name/set", id: RequestId, params: ThreadSetNameParams, } | { "method": "thread/unarchive", id: RequestId, params: ThreadUnarchiveParams, } | { "method": "thread/compact/start", id: RequestId, params: ThreadCompactStartParams, } | { "method": "thread/rollback", id: RequestId, params: ThreadRollbackParams, } | { "method": "thread/list", id: RequestId, params: ThreadListParams, } | { "method": "thread/loaded/list", id: RequestId, params: ThreadLoadedListParams, } | { "method": "thread/read", id: RequestId, params: ThreadReadParams, } | { "method": "skills/list", id: RequestId, params: SkillsListParams, } | { "method": "skills/remote/list", id: RequestId, params: SkillsRemoteReadParams, } | { "method": "skills/remote/export", id: RequestId, params: SkillsRemoteWriteParams, } | { "method": "app/list", id: RequestId, params: AppsListParams, } | { "method": "skills/config/write", id: RequestId, params: SkillsConfigWriteParams, } | { "method": "turn/start", id: RequestId, params: TurnStartParams, } | { "method": "turn/steer", id: RequestId, params: TurnSteerParams, } | { "method": "turn/interrupt", id: RequestId, params: TurnInterruptParams, } | { "method": "review/start", id: RequestId, params: ReviewStartParams, } | { "method": "model/list", id: RequestId, params: ModelListParams, } | { "method": "experimentalFeature/list", id: RequestId, params: ExperimentalFeatureListParams, } | { "method": "mcpServer/oauth/login", id: RequestId, params: McpServerOauthLoginParams, } | { "method": "config/mcpServer/reload", id: RequestId, params: undefined, } | { "method": "mcpServerStatus/list", id: RequestId, params: ListMcpServerStatusParams, } | { "method": "windowsSandbox/setupStart", id: RequestId, params: WindowsSandboxSetupStartParams, } | { "method": "account/login/start", id: RequestId, params: LoginAccountParams, } | { "method": "account/login/cancel", id: RequestId, params: CancelLoginAccountParams, } | { "method": "account/logout", id: RequestId, params: undefined, } | { "method": "account/rateLimits/read", id: RequestId, params: undefined, } | { "method": "feedback/upload", id: RequestId, params: FeedbackUploadParams, } | { "method": "command/exec", id: RequestId, params: CommandExecParams, } | { "method": "config/read", id: RequestId, params: ConfigReadParams, } | { "method": "externalAgentConfig/detect", id: RequestId, params: ExternalAgentConfigDetectParams, } | { "method": "externalAgentConfig/import", id: RequestId, params: ExternalAgentConfigImportParams, } | { "method": "config/value/write", id: RequestId, params: ConfigValueWriteParams, } | { "method": "config/batchWrite", id: RequestId, params: ConfigBatchWriteParams, } | { "method": "configRequirements/read", id: RequestId, params: undefined, } | { "method": "account/read", id: RequestId, params: GetAccountParams, } | { "method": "newConversation", id: RequestId, params: NewConversationParams, } | { "method": "getConversationSummary", id: RequestId, params: GetConversationSummaryParams, } | { "method": "listConversations", id: RequestId, params: ListConversationsParams, } | { "method": "resumeConversation", id: RequestId, params: ResumeConversationParams, } | { "method": "forkConversation", id: RequestId, params: ForkConversationParams, } | { "method": "archiveConversation", id: RequestId, params: ArchiveConversationParams, } | { "method": "sendUserMessage", id: RequestId, params: SendUserMessageParams, } | { "method": "sendUserTurn", id: RequestId, params: SendUserTurnParams, } | { "method": "interruptConversation", id: RequestId, params: InterruptConversationParams, } | { "method": "addConversationListener", id: RequestId, params: AddConversationListenerParams, } | { "method": "removeConversationListener", id: RequestId, params: RemoveConversationListenerParams, } | { "method": "gitDiffToRemote", id: RequestId, params: GitDiffToRemoteParams, } | { "method": "loginApiKey", id: RequestId, params: LoginApiKeyParams, } | { "method": "loginChatGpt", id: RequestId, params: undefined, } | { "method": "cancelLoginChatGpt", id: RequestId, params: CancelLoginChatGptParams, } | { "method": "logoutChatGpt", id: RequestId, params: undefined, } | { "method": "getAuthStatus", id: RequestId, params: GetAuthStatusParams, } | { "method": "getUserSavedConfig", id: RequestId, params: undefined, } | { "method": "setDefaultModel", id: RequestId, params: SetDefaultModelParams, } | { "method": "getUserAgent", id: RequestId, params: undefined, } | { "method": "userInfo", id: RequestId, params: undefined, } | { "method": "fuzzyFileSearch", id: RequestId, params: FuzzyFileSearchParams, } | { "method": "execOneOffCommand", id: RequestId, params: ExecOneOffCommandParams, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts index 92fecbfcc..9b082158c 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts @@ -29,6 +29,7 @@ import type { ReasoningSummaryTextDeltaNotification } from "./v2/ReasoningSummar import type { ReasoningTextDeltaNotification } from "./v2/ReasoningTextDeltaNotification"; import type { TerminalInteractionNotification } from "./v2/TerminalInteractionNotification"; import type { ThreadArchivedNotification } from "./v2/ThreadArchivedNotification"; +import type { ThreadClosedNotification } from "./v2/ThreadClosedNotification"; import type { ThreadNameUpdatedNotification } from "./v2/ThreadNameUpdatedNotification"; import type { ThreadRealtimeClosedNotification } from "./v2/ThreadRealtimeClosedNotification"; import type { ThreadRealtimeErrorNotification } from "./v2/ThreadRealtimeErrorNotification"; @@ -49,4 +50,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW /** * Notification sent from the server to the client. */ -export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; +export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/status/changed", "params": ThreadStatusChangedNotification } | { "method": "thread/archived", "params": ThreadArchivedNotification } | { "method": "thread/unarchived", "params": ThreadUnarchivedNotification } | { "method": "thread/closed", "params": ThreadClosedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "model/rerouted", "params": ModelReroutedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "fuzzyFileSearch/sessionUpdated", "params": FuzzyFileSearchSessionUpdatedNotification } | { "method": "fuzzyFileSearch/sessionCompleted", "params": FuzzyFileSearchSessionCompletedNotification } | { "method": "thread/realtime/started", "params": ThreadRealtimeStartedNotification } | { "method": "thread/realtime/itemAdded", "params": ThreadRealtimeItemAddedNotification } | { "method": "thread/realtime/outputAudio/delta", "params": ThreadRealtimeOutputAudioDeltaNotification } | { "method": "thread/realtime/error", "params": ThreadRealtimeErrorNotification } | { "method": "thread/realtime/closed", "params": ThreadRealtimeClosedNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "windowsSandbox/setupCompleted", "params": WindowsSandboxSetupCompletedNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadClosedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadClosedNotification.ts new file mode 100644 index 000000000..ed5bf546c --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadClosedNotification.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadClosedNotification = { threadId: string, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadUnsubscribeParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadUnsubscribeParams.ts new file mode 100644 index 000000000..3d5f3a04c --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadUnsubscribeParams.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadUnsubscribeParams = { threadId: string, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadUnsubscribeResponse.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadUnsubscribeResponse.ts new file mode 100644 index 000000000..6f8f66b22 --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadUnsubscribeResponse.ts @@ -0,0 +1,6 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { ThreadUnsubscribeStatus } from "./ThreadUnsubscribeStatus"; + +export type ThreadUnsubscribeResponse = { status: ThreadUnsubscribeStatus, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadUnsubscribeStatus.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadUnsubscribeStatus.ts new file mode 100644 index 000000000..2970598dc --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadUnsubscribeStatus.ts @@ -0,0 +1,5 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ThreadUnsubscribeStatus = "notLoaded" | "notSubscribed" | "unsubscribed"; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index c7d62cc07..e49f5b49f 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -168,6 +168,7 @@ export type { ThreadActiveFlag } from "./ThreadActiveFlag"; export type { ThreadArchiveParams } from "./ThreadArchiveParams"; export type { ThreadArchiveResponse } from "./ThreadArchiveResponse"; export type { ThreadArchivedNotification } from "./ThreadArchivedNotification"; +export type { ThreadClosedNotification } from "./ThreadClosedNotification"; export type { ThreadCompactStartParams } from "./ThreadCompactStartParams"; export type { ThreadCompactStartResponse } from "./ThreadCompactStartResponse"; export type { ThreadForkParams } from "./ThreadForkParams"; @@ -204,6 +205,9 @@ export type { ThreadTokenUsageUpdatedNotification } from "./ThreadTokenUsageUpda export type { ThreadUnarchiveParams } from "./ThreadUnarchiveParams"; export type { ThreadUnarchiveResponse } from "./ThreadUnarchiveResponse"; export type { ThreadUnarchivedNotification } from "./ThreadUnarchivedNotification"; +export type { ThreadUnsubscribeParams } from "./ThreadUnsubscribeParams"; +export type { ThreadUnsubscribeResponse } from "./ThreadUnsubscribeResponse"; +export type { ThreadUnsubscribeStatus } from "./ThreadUnsubscribeStatus"; export type { TokenUsageBreakdown } from "./TokenUsageBreakdown"; export type { ToolRequestUserInputAnswer } from "./ToolRequestUserInputAnswer"; export type { ToolRequestUserInputOption } from "./ToolRequestUserInputOption"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 7a918ada0..357341cc4 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -203,6 +203,10 @@ client_request_definitions! { params: v2::ThreadArchiveParams, response: v2::ThreadArchiveResponse, }, + ThreadUnsubscribe => "thread/unsubscribe" { + params: v2::ThreadUnsubscribeParams, + response: v2::ThreadUnsubscribeResponse, + }, ThreadSetName => "thread/name/set" { params: v2::ThreadSetNameParams, response: v2::ThreadSetNameResponse, @@ -822,6 +826,7 @@ server_notification_definitions! { ThreadStatusChanged => "thread/status/changed" (v2::ThreadStatusChangedNotification), ThreadArchived => "thread/archived" (v2::ThreadArchivedNotification), ThreadUnarchived => "thread/unarchived" (v2::ThreadUnarchivedNotification), + ThreadClosed => "thread/closed" (v2::ThreadClosedNotification), ThreadNameUpdated => "thread/name/updated" (v2::ThreadNameUpdatedNotification), ThreadTokenUsageUpdated => "thread/tokenUsage/updated" (v2::ThreadTokenUsageUpdatedNotification), TurnStarted => "turn/started" (v2::TurnStartedNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 862460a4a..2fc234430 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1918,6 +1918,29 @@ pub struct ThreadArchiveParams { #[ts(export_to = "v2/")] pub struct ThreadArchiveResponse {} +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadUnsubscribeParams { + pub thread_id: String, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadUnsubscribeResponse { + pub status: ThreadUnsubscribeStatus, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub enum ThreadUnsubscribeStatus { + NotLoaded, + NotSubscribed, + Unsubscribed, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -3419,6 +3442,13 @@ pub struct ThreadUnarchivedNotification { pub thread_id: String, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadClosedNotification { + pub thread_id: String, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 8a77f4233..2c73429e2 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -127,6 +127,7 @@ Example with notification opt-out: - `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. - `thread/status/changed` — notification emitted when a loaded thread’s status changes (`threadId` + new `status`). - `thread/archive` — move a thread’s rollout file into the archived directory; returns `{}` on success and emits `thread/archived`. +- `thread/unsubscribe` — unsubscribe this connection from thread turn/item events. If this was the last subscriber, the server shuts down and unloads the thread, then emits `thread/closed`. - `thread/name/set` — set or update a thread’s user-facing name; returns `{}` on success. Thread names are not required to be unique; name lookups resolve to the most recently updated thread. - `thread/unarchive` — move an archived rollout file back into the sessions directory; returns the restored `thread` on success and emits `thread/unarchived`. - `thread/compact/start` — trigger conversation history compaction for a thread; returns `{}` immediately while progress streams through standard turn/item notifications. @@ -283,6 +284,26 @@ When `nextCursor` is `null`, you’ve reached the final page. } } ``` +### Example: Unsubscribe from a loaded thread + +`thread/unsubscribe` removes the current connection's subscription to a thread. The response status is one of: + +- `unsubscribed` when the connection was subscribed and is now removed. +- `notSubscribed` when the connection was not subscribed to that thread. +- `notLoaded` when the thread is not loaded. + +If this was the last subscriber, the server unloads the thread and emits `thread/closed` and a `thread/status/changed` transition to `notLoaded`. + +```json +{ "method": "thread/unsubscribe", "id": 22, "params": { "threadId": "thr_123" } } +{ "id": 22, "result": { "status": "unsubscribed" } } +{ "method": "thread/status/changed", "params": { + "threadId": "thr_123", + "status": { "type": "notLoaded" } +} } +{ "method": "thread/closed", "params": { "threadId": "thr_123" } } +``` + ### Example: Read a thread Use `thread/read` to fetch a stored thread by id without resuming it. Pass `includeTurns` when you want the rollout history loaded into `thread.turns`. The returned thread includes `agentNickname` and `agentRole` for AgentControl-spawned thread sub-agents when available. @@ -555,7 +576,7 @@ Notes: ## Events -Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `thread/archived`, `thread/unarchived`, `turn/*`, and `item/*` notifications. +Event notifications are the server-initiated event stream for thread lifecycles, turn lifecycles, and the items within them. After you start or resume a thread, keep reading stdout for `thread/started`, `thread/archived`, `thread/unarchived`, `thread/closed`, `turn/*`, and `item/*` notifications. Thread realtime uses a separate thread-scoped notification surface. `thread/realtime/*` notifications are ephemeral transport events, not `ThreadItem`s, and are not returned by `thread/read`, `thread/resume`, or `thread/fork`. diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index c489208ac..683fca361 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -2390,7 +2390,11 @@ mod tests { let event_turn_id = "complete1".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new(tx)); - let outgoing = ThreadScopedOutgoingMessageSender::new(outgoing, vec![ConnectionId(1)]); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); let thread_state = new_thread_state(); handle_turn_complete( @@ -2431,7 +2435,11 @@ mod tests { .await; let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new(tx)); - let outgoing = ThreadScopedOutgoingMessageSender::new(outgoing, vec![ConnectionId(1)]); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); handle_turn_interrupted( conversation_id, @@ -2471,7 +2479,11 @@ mod tests { .await; let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new(tx)); - let outgoing = ThreadScopedOutgoingMessageSender::new(outgoing, vec![ConnectionId(1)]); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); handle_turn_complete( conversation_id, @@ -2505,7 +2517,11 @@ mod tests { async fn test_handle_turn_plan_update_emits_notification_for_v2() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new(tx)); - let outgoing = ThreadScopedOutgoingMessageSender::new(outgoing, vec![ConnectionId(1)]); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); let update = UpdatePlanArgs { explanation: Some("need plan".to_string()), plan: vec![ @@ -2555,7 +2571,11 @@ mod tests { let turn_id = "turn-123".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new(tx)); - let outgoing = ThreadScopedOutgoingMessageSender::new(outgoing, vec![ConnectionId(1)]); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); let info = TokenUsageInfo { total_token_usage: TokenUsage { @@ -2639,7 +2659,11 @@ mod tests { let turn_id = "turn-456".to_string(); let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new(tx)); - let outgoing = ThreadScopedOutgoingMessageSender::new(outgoing, vec![ConnectionId(1)]); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); handle_token_count_event( conversation_id, @@ -2706,7 +2730,11 @@ mod tests { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new(tx)); - let outgoing = ThreadScopedOutgoingMessageSender::new(outgoing, vec![ConnectionId(1)]); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); // Turn 1 on conversation A let a_turn1 = "a_turn1".to_string(); @@ -2929,7 +2957,11 @@ mod tests { async fn test_handle_turn_diff_emits_v2_notification() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new(tx)); - let outgoing = ThreadScopedOutgoingMessageSender::new(outgoing, vec![ConnectionId(1)]); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); let unified_diff = "--- a\n+++ b\n".to_string(); let conversation_id = ThreadId::new(); @@ -2963,7 +2995,11 @@ mod tests { async fn test_handle_turn_diff_is_noop_for_v1() -> Result<()> { let (tx, mut rx) = mpsc::channel(CHANNEL_CAPACITY); let outgoing = Arc::new(OutgoingMessageSender::new(tx)); - let outgoing = ThreadScopedOutgoingMessageSender::new(outgoing, vec![ConnectionId(1)]); + let outgoing = ThreadScopedOutgoingMessageSender::new( + outgoing, + vec![ConnectionId(1)], + ThreadId::new(), + ); let conversation_id = ThreadId::new(); handle_turn_diff( diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 189789871..520c9475c 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -127,6 +127,7 @@ use codex_app_server_protocol::ThreadArchiveResponse; use codex_app_server_protocol::ThreadArchivedNotification; use codex_app_server_protocol::ThreadBackgroundTerminalsCleanParams; use codex_app_server_protocol::ThreadBackgroundTerminalsCleanResponse; +use codex_app_server_protocol::ThreadClosedNotification; use codex_app_server_protocol::ThreadCompactStartParams; use codex_app_server_protocol::ThreadCompactStartResponse; use codex_app_server_protocol::ThreadForkParams; @@ -160,6 +161,9 @@ use codex_app_server_protocol::ThreadStatus; use codex_app_server_protocol::ThreadUnarchiveParams; use codex_app_server_protocol::ThreadUnarchiveResponse; use codex_app_server_protocol::ThreadUnarchivedNotification; +use codex_app_server_protocol::ThreadUnsubscribeParams; +use codex_app_server_protocol::ThreadUnsubscribeResponse; +use codex_app_server_protocol::ThreadUnsubscribeStatus; use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnInterruptParams; use codex_app_server_protocol::TurnStartParams; @@ -323,6 +327,12 @@ enum AppListLoadResult { Directory(Result, String>), } +enum ThreadShutdownResult { + Complete, + SubmitFailed, + TimedOut, +} + fn convert_remote_scope(scope: ApiHazelnutScope) -> RemoteSkillHazelnutScope { match scope { ApiHazelnutScope::WorkspaceShared => RemoteSkillHazelnutScope::WorkspaceShared, @@ -358,6 +368,7 @@ pub(crate) struct CodexMessageProcessor { cli_overrides: Vec<(String, TomlValue)>, cloud_requirements: Arc>, active_login: Arc>>, + pending_thread_unloads: Arc>>, thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, pending_fuzzy_searches: Arc>>>, @@ -430,6 +441,7 @@ impl CodexMessageProcessor { cli_overrides, cloud_requirements, active_login: Arc::new(Mutex::new(None)), + pending_thread_unloads: Arc::new(Mutex::new(HashSet::new())), thread_state_manager: ThreadStateManager::new(), thread_watch_manager: ThreadWatchManager::new_with_outgoing(outgoing), pending_fuzzy_searches: Arc::new(Mutex::new(HashMap::new())), @@ -557,6 +569,10 @@ impl CodexMessageProcessor { self.thread_start(to_connection_request_id(request_id), params) .await; } + ClientRequest::ThreadUnsubscribe { request_id, params } => { + self.thread_unsubscribe(to_connection_request_id(request_id), params) + .await; + } ClientRequest::ThreadResume { request_id, params } => { self.thread_resume(to_connection_request_id(request_id), params) .await; @@ -2876,6 +2892,23 @@ impl CodexMessageProcessor { } async fn thread_resume(&mut self, request_id: ConnectionRequestId, params: ThreadResumeParams) { + if let Ok(thread_id) = ThreadId::from_string(¶ms.thread_id) + && self + .pending_thread_unloads + .lock() + .await + .contains(&thread_id) + { + self.send_invalid_request_error( + request_id, + format!( + "thread {thread_id} is closing; retry thread/resume after the thread is closed" + ), + ) + .await; + return; + } + if self .resume_running_thread(request_id.clone(), ¶ms) .await @@ -4729,6 +4762,150 @@ impl CodexMessageProcessor { } } + async fn wait_for_thread_shutdown(thread: &Arc) -> ThreadShutdownResult { + match thread.submit(Op::Shutdown).await { + Ok(_) => { + let wait_for_shutdown = async { + loop { + if matches!(thread.agent_status().await, AgentStatus::Shutdown) { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }; + if tokio::time::timeout(Duration::from_secs(10), wait_for_shutdown) + .await + .is_err() + { + ThreadShutdownResult::TimedOut + } else { + ThreadShutdownResult::Complete + } + } + Err(_) => ThreadShutdownResult::SubmitFailed, + } + } + + async fn finalize_thread_teardown(&mut self, thread_id: ThreadId) { + self.pending_thread_unloads.lock().await.remove(&thread_id); + self.outgoing.cancel_requests_for_thread(thread_id).await; + self.thread_state_manager + .remove_thread_state(thread_id) + .await; + self.thread_watch_manager + .remove_thread(&thread_id.to_string()) + .await; + } + + async fn thread_unsubscribe( + &mut self, + request_id: ConnectionRequestId, + params: ThreadUnsubscribeParams, + ) { + let thread_id = match ThreadId::from_string(¶ms.thread_id) { + Ok(id) => id, + Err(err) => { + self.send_invalid_request_error(request_id, format!("invalid thread id: {err}")) + .await; + return; + } + }; + + let Ok(thread) = self.thread_manager.get_thread(thread_id).await else { + // Reconcile stale app-server bookkeeping when the thread has already been + // removed from the core manager. This keeps loaded-status/subscription state + // consistent with the source of truth before reporting NotLoaded. + self.finalize_thread_teardown(thread_id).await; + self.outgoing + .send_response( + request_id, + ThreadUnsubscribeResponse { + status: ThreadUnsubscribeStatus::NotLoaded, + }, + ) + .await; + return; + }; + + let was_subscribed = self + .thread_state_manager + .unsubscribe_connection_from_thread(thread_id, request_id.connection_id) + .await; + if !was_subscribed { + self.outgoing + .send_response( + request_id, + ThreadUnsubscribeResponse { + status: ThreadUnsubscribeStatus::NotSubscribed, + }, + ) + .await; + return; + } + + if !self.thread_state_manager.has_subscribers(thread_id).await { + // This connection was the last subscriber. Only now do we unload the thread. + info!("thread {thread_id} has no subscribers; shutting down"); + self.pending_thread_unloads.lock().await.insert(thread_id); + // Any pending app-server -> client requests for this thread can no longer be + // answered; cancel their callbacks before shutdown/unload. + self.outgoing.cancel_requests_for_thread(thread_id).await; + self.thread_state_manager + .remove_thread_state(thread_id) + .await; + + let outgoing = self.outgoing.clone(); + let pending_thread_unloads = self.pending_thread_unloads.clone(); + let thread_manager = self.thread_manager.clone(); + let thread_watch_manager = self.thread_watch_manager.clone(); + tokio::spawn(async move { + match Self::wait_for_thread_shutdown(&thread).await { + ThreadShutdownResult::Complete => { + if thread_manager.remove_thread(&thread_id).await.is_none() { + info!( + "thread {thread_id} was already removed before unsubscribe finalized" + ); + thread_watch_manager + .remove_thread(&thread_id.to_string()) + .await; + pending_thread_unloads.lock().await.remove(&thread_id); + return; + } + thread_watch_manager + .remove_thread(&thread_id.to_string()) + .await; + let notification = ThreadClosedNotification { + thread_id: thread_id.to_string(), + }; + outgoing + .send_server_notification(ServerNotification::ThreadClosed( + notification, + )) + .await; + pending_thread_unloads.lock().await.remove(&thread_id); + } + ThreadShutdownResult::SubmitFailed => { + pending_thread_unloads.lock().await.remove(&thread_id); + warn!("failed to submit Shutdown to thread {thread_id}"); + } + ThreadShutdownResult::TimedOut => { + pending_thread_unloads.lock().await.remove(&thread_id); + warn!("thread {thread_id} shutdown timed out; leaving thread loaded"); + } + } + }); + } + + self.outgoing + .send_response( + request_id, + ThreadUnsubscribeResponse { + status: ThreadUnsubscribeStatus::Unsubscribed, + }, + ) + .await; + } + async fn archive_thread_common( &mut self, thread_id: ThreadId, @@ -4800,37 +4977,19 @@ impl CodexMessageProcessor { state_db_ctx = Some(ctx); } info!("thread {thread_id} was active; shutting down"); - // Request shutdown. - match conversation.submit(Op::Shutdown).await { - Ok(_) => { - // Poll agent status rather than consuming events so attached listeners do not block shutdown. - let wait_for_shutdown = async { - loop { - if matches!(conversation.agent_status().await, AgentStatus::Shutdown) { - break; - } - tokio::time::sleep(Duration::from_millis(50)).await; - } - }; - if tokio::time::timeout(Duration::from_secs(10), wait_for_shutdown) - .await - .is_err() - { - warn!("thread {thread_id} shutdown timed out; proceeding with archive"); - } + match Self::wait_for_thread_shutdown(&conversation).await { + ThreadShutdownResult::Complete => {} + ThreadShutdownResult::SubmitFailed => { + error!( + "failed to submit Shutdown to thread {thread_id}; proceeding with archive" + ); } - Err(err) => { - error!("failed to submit Shutdown to thread {thread_id}: {err}"); + ThreadShutdownResult::TimedOut => { + warn!("thread {thread_id} shutdown timed out; proceeding with archive"); } } - self.thread_state_manager - .remove_thread_state(thread_id) - .await; } - - self.thread_watch_manager - .remove_thread(&thread_id.to_string()) - .await; + self.finalize_thread_teardown(thread_id).await; if state_db_ctx.is_none() { state_db_ctx = get_state_db(&self.config, None).await; @@ -6180,6 +6339,7 @@ impl CodexMessageProcessor { let thread_outgoing = ThreadScopedOutgoingMessageSender::new( outgoing_for_task.clone(), subscribed_connection_ids, + conversation_id, ); apply_bespoke_event_handling( event.clone(), @@ -6482,10 +6642,8 @@ impl CodexMessageProcessor { WindowsSandboxSetupMode::Unelevated => CoreWindowsSandboxSetupMode::Unelevated, }; let config = Arc::clone(&self.config); - let outgoing = ThreadScopedOutgoingMessageSender::new( - Arc::clone(&self.outgoing), - vec![request_id.connection_id], - ); + let outgoing = Arc::clone(&self.outgoing); + let connection_id = request_id.connection_id; tokio::spawn(async move { let setup_request = WindowsSandboxSetupRequest { @@ -6508,9 +6666,10 @@ impl CodexMessageProcessor { error: setup_result.err().map(|err| err.to_string()), }; outgoing - .send_server_notification(ServerNotification::WindowsSandboxSetupCompleted( - notification, - )) + .send_server_notification_to_connections( + &[connection_id], + ServerNotification::WindowsSandboxSetupCompleted(notification), + ) .await; }); } diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 252705724..edd512434 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -87,7 +87,7 @@ impl ExternalAuthRefresher for ExternalAuthRefreshBridge { let (request_id, rx) = self .outgoing - .send_request_with_id(ServerRequestPayload::ChatgptAuthTokensRefresh(params)) + .send_request(ServerRequestPayload::ChatgptAuthTokensRefresh(params)) .await; let result = match timeout(EXTERNAL_AUTH_REFRESH_TIMEOUT, rx).await { diff --git a/codex-rs/app-server/src/outgoing_message.rs b/codex-rs/app-server/src/outgoing_message.rs index fe8f75588..44de78345 100644 --- a/codex-rs/app-server/src/outgoing_message.rs +++ b/codex-rs/app-server/src/outgoing_message.rs @@ -9,6 +9,7 @@ use codex_app_server_protocol::Result; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; use codex_app_server_protocol::ServerRequestPayload; +use codex_protocol::ThreadId; use serde::Serialize; use tokio::sync::Mutex; use tokio::sync::mpsc; @@ -48,23 +49,31 @@ pub(crate) enum OutgoingEnvelope { pub(crate) struct OutgoingMessageSender { next_server_request_id: AtomicI64, sender: mpsc::Sender, - request_id_to_callback: Mutex>>, + request_id_to_callback: Mutex>, } #[derive(Clone)] pub(crate) struct ThreadScopedOutgoingMessageSender { outgoing: Arc, connection_ids: Arc>, + thread_id: ThreadId, +} + +struct PendingCallbackEntry { + callback: oneshot::Sender, + thread_id: Option, } impl ThreadScopedOutgoingMessageSender { pub(crate) fn new( outgoing: Arc, connection_ids: Vec, + thread_id: ThreadId, ) -> Self { Self { outgoing, connection_ids: Arc::new(connection_ids), + thread_id, } } @@ -72,12 +81,12 @@ impl ThreadScopedOutgoingMessageSender { &self, payload: ServerRequestPayload, ) -> oneshot::Receiver { - if self.connection_ids.is_empty() { - let (_tx, rx) = oneshot::channel(); - return rx; - } self.outgoing - .send_request_to_connections(self.connection_ids.as_slice(), payload) + .send_request_to_thread_connections( + self.thread_id, + self.connection_ids.as_slice(), + payload, + ) .await } @@ -116,35 +125,52 @@ impl OutgoingMessageSender { } } - pub(crate) async fn send_request_to_connections( - &self, - connection_ids: &[ConnectionId], - request: ServerRequestPayload, - ) -> oneshot::Receiver { - let (_id, rx) = self - .send_request_with_id_to_connections(connection_ids, request) - .await; - rx - } - - pub(crate) async fn send_request_with_id( + pub(crate) async fn send_request( &self, request: ServerRequestPayload, ) -> (RequestId, oneshot::Receiver) { - self.send_request_with_id_to_connections(&[], request).await + self.send_request_with_id_to_connections(&[], request, None) + .await + } + + async fn send_request_to_thread_connections( + &self, + thread_id: ThreadId, + connection_ids: &[ConnectionId], + request: ServerRequestPayload, + ) -> oneshot::Receiver { + if connection_ids.is_empty() { + let (_tx, rx) = oneshot::channel(); + return rx; + } + let (_request_id, receiver) = self + .send_request_with_id_to_connections(connection_ids, request, Some(thread_id)) + .await; + receiver + } + + fn next_request_id(&self) -> RequestId { + RequestId::Integer(self.next_server_request_id.fetch_add(1, Ordering::Relaxed)) } async fn send_request_with_id_to_connections( &self, connection_ids: &[ConnectionId], request: ServerRequestPayload, + thread_id: Option, ) -> (RequestId, oneshot::Receiver) { - let id = RequestId::Integer(self.next_server_request_id.fetch_add(1, Ordering::Relaxed)); + let id = self.next_request_id(); let outgoing_message_id = id.clone(); let (tx_approve, rx_approve) = oneshot::channel(); { let mut request_id_to_callback = self.request_id_to_callback.lock().await; - request_id_to_callback.insert(id, tx_approve); + request_id_to_callback.insert( + id, + PendingCallbackEntry { + callback: tx_approve, + thread_id, + }, + ); } let outgoing_message = @@ -191,8 +217,8 @@ impl OutgoingMessageSender { }; match entry { - Some((id, sender)) => { - if let Err(err) = sender.send(Ok(result)) { + Some((id, entry)) => { + if let Err(err) = entry.callback.send(Ok(result)) { warn!("could not notify callback for {id:?} due to: {err:?}"); } } @@ -209,9 +235,9 @@ impl OutgoingMessageSender { }; match entry { - Some((id, sender)) => { + Some((id, entry)) => { warn!("client responded with error for {id:?}: {error:?}"); - if let Err(err) = sender.send(Err(error)) { + if let Err(err) = entry.callback.send(Err(error)) { warn!("could not notify callback for {id:?} due to: {err:?}"); } } @@ -229,6 +255,19 @@ impl OutgoingMessageSender { entry.is_some() } + pub(crate) async fn cancel_requests_for_thread(&self, thread_id: ThreadId) { + let mut request_id_to_callback = self.request_id_to_callback.lock().await; + let request_ids = request_id_to_callback + .iter() + .filter_map(|(request_id, entry)| { + (entry.thread_id == Some(thread_id)).then_some(request_id.clone()) + }) + .collect::>(); + for request_id in request_ids { + request_id_to_callback.remove(&request_id); + } + } + pub(crate) async fn send_response( &self, request_id: ConnectionRequestId, @@ -657,7 +696,7 @@ mod tests { let outgoing = OutgoingMessageSender::new(tx); let (request_id, wait_for_result) = outgoing - .send_request_with_id(ServerRequestPayload::ApplyPatchApproval( + .send_request(ServerRequestPayload::ApplyPatchApproval( ApplyPatchApprovalParams { conversation_id: ThreadId::new(), call_id: "call-id".to_string(), diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 19bb025de..04e2a8240 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -207,6 +207,50 @@ impl ThreadStateManager { }); } + pub(crate) async fn unsubscribe_connection_from_thread( + &mut self, + thread_id: ThreadId, + connection_id: ConnectionId, + ) -> bool { + let Some(thread_state) = self.thread_states.get(&thread_id) else { + return false; + }; + + if !self + .thread_ids_by_connection + .get(&connection_id) + .is_some_and(|thread_ids| thread_ids.contains(&thread_id)) + { + return false; + } + + if let Some(thread_ids) = self.thread_ids_by_connection.get_mut(&connection_id) { + thread_ids.remove(&thread_id); + if thread_ids.is_empty() { + self.thread_ids_by_connection.remove(&connection_id); + } + } + + self.subscription_state_by_id.retain(|_, state| { + !(state.thread_id == thread_id && state.connection_id == connection_id) + }); + + let mut thread_state = thread_state.lock().await; + thread_state.remove_connection(connection_id); + true + } + + pub(crate) async fn has_subscribers(&self, thread_id: ThreadId) -> bool { + let Some(thread_state) = self.thread_states.get(&thread_id) else { + return false; + }; + !thread_state + .lock() + .await + .subscribed_connection_ids() + .is_empty() + } + pub(crate) async fn set_listener( &mut self, subscription_id: Uuid, diff --git a/codex-rs/app-server/src/thread_status.rs b/codex-rs/app-server/src/thread_status.rs index 2eb1d8d03..f3a7c1fd8 100644 --- a/codex-rs/app-server/src/thread_status.rs +++ b/codex-rs/app-server/src/thread_status.rs @@ -300,8 +300,16 @@ impl ThreadWatchState { } fn remove_thread(&mut self, thread_id: &str) -> Option { + let previous_status = self.status_for(thread_id); self.runtime_by_thread_id.remove(thread_id); - None + if previous_status.is_some() && previous_status != Some(ThreadStatus::NotLoaded) { + Some(ThreadStatusChangedNotification { + thread_id: thread_id.to_string(), + status: ThreadStatus::NotLoaded, + }) + } else { + None + } } fn update_runtime( @@ -673,6 +681,15 @@ mod tests { }, }, ); + + manager.remove_thread(INTERACTIVE_THREAD_ID).await; + assert_eq!( + recv_status_changed_notification(&mut outgoing_rx).await, + ThreadStatusChangedNotification { + thread_id: INTERACTIVE_THREAD_ID.to_string(), + status: ThreadStatus::NotLoaded, + }, + ); } async fn wait_for_status( diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index f1917add8..cb0c98806 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -66,6 +66,7 @@ use codex_app_server_protocol::ThreadRollbackParams; use codex_app_server_protocol::ThreadSetNameParams; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadUnarchiveParams; +use codex_app_server_protocol::ThreadUnsubscribeParams; use codex_app_server_protocol::TurnCompletedNotification; use codex_app_server_protocol::TurnInterruptParams; use codex_app_server_protocol::TurnStartParams; @@ -433,6 +434,15 @@ impl McpProcess { self.send_request("thread/name/set", params).await } + /// Send a `thread/unsubscribe` JSON-RPC request. + pub async fn send_thread_unsubscribe_request( + &mut self, + params: ThreadUnsubscribeParams, + ) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("thread/unsubscribe", params).await + } + /// Send a `thread/unarchive` JSON-RPC request. pub async fn send_thread_unarchive_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index 8afb1e3f4..244413097 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -31,6 +31,7 @@ mod thread_rollback; mod thread_start; mod thread_status; mod thread_unarchive; +mod thread_unsubscribe; mod turn_interrupt; mod turn_start; mod turn_start_zsh_fork; diff --git a/codex-rs/app-server/tests/suite/v2/thread_archive.rs b/codex-rs/app-server/tests/suite/v2/thread_archive.rs index d7aa60f2b..20fd6fd88 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_archive.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_archive.rs @@ -8,8 +8,13 @@ use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ThreadArchiveParams; use codex_app_server_protocol::ThreadArchiveResponse; use codex_app_server_protocol::ThreadArchivedNotification; +use codex_app_server_protocol::ThreadResumeParams; +use codex_app_server_protocol::ThreadResumeResponse; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::ThreadStatus; +use codex_app_server_protocol::ThreadUnarchiveParams; +use codex_app_server_protocol::ThreadUnarchiveResponse; use codex_app_server_protocol::TurnStartParams; use codex_app_server_protocol::TurnStartResponse; use codex_app_server_protocol::UserInput; @@ -155,6 +160,140 @@ async fn thread_archive_requires_materialized_rollout() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_archive_clears_stale_subscriptions_before_resume() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut primary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??; + + let start_id = primary + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_start_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "materialize".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let turn_start_response: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(turn_start_id)), + ) + .await??; + let _: TurnStartResponse = to_response::(turn_start_response)?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + primary.clear_message_buffer(); + + let mut secondary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, secondary.initialize()).await??; + + let archive_id = primary + .send_thread_archive_request(ThreadArchiveParams { + thread_id: thread.id.clone(), + }) + .await?; + let archive_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(archive_id)), + ) + .await??; + let _: ThreadArchiveResponse = to_response::(archive_resp)?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("thread/archived"), + ) + .await??; + + let unarchive_id = primary + .send_thread_unarchive_request(ThreadUnarchiveParams { + thread_id: thread.id.clone(), + }) + .await?; + let unarchive_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(unarchive_id)), + ) + .await??; + let _: ThreadUnarchiveResponse = to_response::(unarchive_resp)?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("thread/unarchived"), + ) + .await??; + primary.clear_message_buffer(); + + let resume_id = secondary + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id.clone(), + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + secondary.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let resume: ThreadResumeResponse = to_response::(resume_resp)?; + assert_eq!(resume.thread.status, ThreadStatus::Idle); + primary.clear_message_buffer(); + secondary.clear_message_buffer(); + + let resumed_turn_id = secondary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id, + input: vec![UserInput::Text { + text: "secondary turn".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let resumed_turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + secondary.read_stream_until_response_message(RequestId::Integer(resumed_turn_id)), + ) + .await??; + let _: TurnStartResponse = to_response::(resumed_turn_resp)?; + + assert!( + timeout( + std::time::Duration::from_millis(250), + primary.read_stream_until_notification_message("turn/started"), + ) + .await + .is_err() + ); + + timeout( + DEFAULT_READ_TIMEOUT, + secondary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + Ok(()) +} + fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write(config_toml, config_contents(server_uri)) diff --git a/codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs b/codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs new file mode 100644 index 000000000..f16ef8bb0 --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/thread_unsubscribe.rs @@ -0,0 +1,383 @@ +use anyhow::Context; +use anyhow::Result; +use app_test_support::McpProcess; +use app_test_support::create_mock_responses_server_repeating_assistant; +use app_test_support::create_mock_responses_server_sequence; +use app_test_support::create_shell_command_sse_response; +use app_test_support::to_response; +use codex_app_server_protocol::ItemStartedNotification; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::ThreadLoadedListParams; +use codex_app_server_protocol::ThreadLoadedListResponse; +use codex_app_server_protocol::ThreadReadParams; +use codex_app_server_protocol::ThreadReadResponse; +use codex_app_server_protocol::ThreadResumeParams; +use codex_app_server_protocol::ThreadResumeResponse; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; +use codex_app_server_protocol::ThreadStatus; +use codex_app_server_protocol::ThreadStatusChangedNotification; +use codex_app_server_protocol::ThreadUnsubscribeParams; +use codex_app_server_protocol::ThreadUnsubscribeResponse; +use codex_app_server_protocol::ThreadUnsubscribeStatus; +use codex_app_server_protocol::TurnStartParams; +use codex_app_server_protocol::TurnStartResponse; +use codex_app_server_protocol::UserInput as V2UserInput; +use core_test_support::responses; +use pretty_assertions::assert_eq; +use tempfile::TempDir; +use tokio::time::timeout; + +const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + +#[tokio::test] +async fn thread_unsubscribe_unloads_thread_and_emits_thread_closed_notification() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_id = start_thread(&mut mcp).await?; + + let unsubscribe_id = mcp + .send_thread_unsubscribe_request(ThreadUnsubscribeParams { + thread_id: thread_id.clone(), + }) + .await?; + let unsubscribe_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(unsubscribe_id)), + ) + .await??; + let unsubscribe = to_response::(unsubscribe_resp)?; + assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed); + + let closed_notif: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/closed"), + ) + .await??; + let parsed: ServerNotification = closed_notif.try_into()?; + let ServerNotification::ThreadClosed(payload) = parsed else { + anyhow::bail!("expected thread/closed notification"); + }; + assert_eq!(payload.thread_id, thread_id); + + let status_changed = wait_for_thread_status_not_loaded(&mut mcp, &payload.thread_id).await?; + assert_eq!(status_changed.thread_id, payload.thread_id); + assert_eq!(status_changed.status, ThreadStatus::NotLoaded); + + let list_id = mcp + .send_thread_loaded_list_request(ThreadLoadedListParams::default()) + .await?; + let list_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(list_id)), + ) + .await??; + let ThreadLoadedListResponse { data, next_cursor } = + to_response::(list_resp)?; + assert_eq!(data, Vec::::new()); + assert_eq!(next_cursor, None); + + Ok(()) +} + +#[tokio::test] +async fn thread_unsubscribe_during_turn_interrupts_turn_and_emits_thread_closed() -> Result<()> { + #[cfg(target_os = "windows")] + let shell_command = vec![ + "powershell".to_string(), + "-Command".to_string(), + "Start-Sleep -Seconds 10".to_string(), + ]; + #[cfg(not(target_os = "windows"))] + let shell_command = vec!["sleep".to_string(), "10".to_string()]; + + let tmp = TempDir::new()?; + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home)?; + let working_directory = tmp.path().join("workdir"); + std::fs::create_dir(&working_directory)?; + + let server = create_mock_responses_server_sequence(vec![create_shell_command_sse_response( + shell_command.clone(), + Some(&working_directory), + Some(10_000), + "call_sleep", + )?]) + .await; + create_config_toml(&codex_home, &server.uri())?; + + let mut mcp = McpProcess::new(&codex_home).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_id = start_thread(&mut mcp).await?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread_id.clone(), + input: vec![V2UserInput::Text { + text: "run sleep".to_string(), + text_elements: Vec::new(), + }], + cwd: Some(working_directory), + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let _: TurnStartResponse = to_response::(turn_resp)?; + + timeout( + DEFAULT_READ_TIMEOUT, + wait_for_command_execution_item_started(&mut mcp), + ) + .await??; + + let unsubscribe_id = mcp + .send_thread_unsubscribe_request(ThreadUnsubscribeParams { + thread_id: thread_id.clone(), + }) + .await?; + let unsubscribe_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(unsubscribe_id)), + ) + .await??; + let unsubscribe = to_response::(unsubscribe_resp)?; + assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed); + + let closed_notif: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/closed"), + ) + .await??; + let parsed: ServerNotification = closed_notif.try_into()?; + let ServerNotification::ThreadClosed(payload) = parsed else { + anyhow::bail!("expected thread/closed notification"); + }; + assert_eq!(payload.thread_id, thread_id); + + Ok(()) +} + +#[tokio::test] +async fn thread_unsubscribe_clears_cached_status_before_resume() -> Result<()> { + let server = responses::start_mock_server().await; + let _response_mock = responses::mount_sse_once( + &server, + responses::sse_failed("resp-1", "server_error", "simulated failure"), + ) + .await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_id = start_thread(&mut mcp).await?; + + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread_id.clone(), + input: vec![V2UserInput::Text { + text: "fail this turn".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + let turn_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + let _: TurnStartResponse = to_response::(turn_resp)?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("error"), + ) + .await??; + + let read_id = mcp + .send_thread_read_request(ThreadReadParams { + thread_id: thread_id.clone(), + include_turns: false, + }) + .await?; + let read_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(read_id)), + ) + .await??; + let ThreadReadResponse { thread } = to_response::(read_resp)?; + assert_eq!(thread.status, ThreadStatus::SystemError); + + let unsubscribe_id = mcp + .send_thread_unsubscribe_request(ThreadUnsubscribeParams { + thread_id: thread_id.clone(), + }) + .await?; + let unsubscribe_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(unsubscribe_id)), + ) + .await??; + let unsubscribe = to_response::(unsubscribe_resp)?; + assert_eq!(unsubscribe.status, ThreadUnsubscribeStatus::Unsubscribed); + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/closed"), + ) + .await??; + + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id, + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let resume: ThreadResumeResponse = to_response::(resume_resp)?; + assert_eq!(resume.thread.status, ThreadStatus::Idle); + + Ok(()) +} + +#[tokio::test] +async fn thread_unsubscribe_reports_not_loaded_after_thread_is_unloaded() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_id = start_thread(&mut mcp).await?; + + let first_unsubscribe_id = mcp + .send_thread_unsubscribe_request(ThreadUnsubscribeParams { + thread_id: thread_id.clone(), + }) + .await?; + let first_unsubscribe_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(first_unsubscribe_id)), + ) + .await??; + let first_unsubscribe = to_response::(first_unsubscribe_resp)?; + assert_eq!( + first_unsubscribe.status, + ThreadUnsubscribeStatus::Unsubscribed + ); + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/closed"), + ) + .await??; + + let second_unsubscribe_id = mcp + .send_thread_unsubscribe_request(ThreadUnsubscribeParams { thread_id }) + .await?; + let second_unsubscribe_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(second_unsubscribe_id)), + ) + .await??; + let second_unsubscribe = to_response::(second_unsubscribe_resp)?; + assert_eq!( + second_unsubscribe.status, + ThreadUnsubscribeStatus::NotLoaded + ); + + Ok(()) +} + +async fn wait_for_command_execution_item_started(mcp: &mut McpProcess) -> Result<()> { + loop { + let started_notif = mcp + .read_stream_until_notification_message("item/started") + .await?; + let started_params = started_notif.params.context("item/started params")?; + let started: ItemStartedNotification = serde_json::from_value(started_params)?; + if let ThreadItem::CommandExecution { .. } = started.item { + return Ok(()); + } + } +} + +async fn wait_for_thread_status_not_loaded( + mcp: &mut McpProcess, + thread_id: &str, +) -> Result { + loop { + let status_changed_notif: JSONRPCNotification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/status/changed"), + ) + .await??; + let status_changed_params = status_changed_notif + .params + .context("thread/status/changed params must be present")?; + let status_changed: ThreadStatusChangedNotification = + serde_json::from_value(status_changed_params)?; + if status_changed.thread_id == thread_id && status_changed.status == ThreadStatus::NotLoaded + { + return Ok(status_changed); + } + } +} + +fn create_config_toml(codex_home: &std::path::Path, server_uri: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +model = "mock-model" +approval_policy = "never" +sandbox_mode = "danger-full-access" + +model_provider = "mock_provider" + +[model_providers.mock_provider] +name = "Mock provider for test" +base_url = "{server_uri}/v1" +wire_api = "responses" +request_max_retries = 0 +stream_max_retries = 0 +"# + ), + ) +} + +async fn start_thread(mcp: &mut McpProcess) -> Result { + let req_id = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(req_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(resp)?; + Ok(thread.id) +}