From 45b7763c3f2b52161d384e5769285c7bb6722d08 Mon Sep 17 00:00:00 2001 From: Matthew Zeng Date: Sun, 8 Feb 2026 15:24:56 -0800 Subject: [PATCH] [apps] Improve app loading. (#10994) There are two concepts of apps that we load in the harness: - Directory apps, which is all the apps that the user can install. - Accessible apps, which is what the user actually installed and can be $ inserted and be used by the model. These are extracted from the tools that are loaded through the gateway MCP. Previously we wait for both sets of apps before returning the full apps list. Which causes many issues because accessible apps won't be available to the UI or the model if directory apps aren't loaded or failed to load. In this PR we are separating them so that accessible apps can be loaded separately and are instantly available to be shown in the UI and to be provided in model context. We also added an app-server event so that clients can subscribe to also get accessible apps without being blocked on the full app list. - [x] Separate accessible apps and directory apps loading. - [x] `app/list` request will also emit `app/list/updated` notifications that app-server clients can subscribe. Which allows clients to get accessible apps list to render in the $ menu without being blocked by directory apps. - [x] Cache both accessible and directory apps with 1 hour TTL to avoid reloading them when creating new threads. - [x] TUI improvements to redraw $ menu and /apps menu when app list is updated. --- .../schema/json/ClientRequest.json | 5 + .../schema/json/ServerNotification.json | 85 +++++ .../codex_app_server_protocol.schemas.json | 44 +++ .../json/v2/AppListUpdatedNotification.json | 69 ++++ .../schema/json/v2/AppsListParams.json | 5 + .../schema/json/v2/AppsListResponse.json | 2 + .../schema/typescript/ServerNotification.ts | 3 +- .../schema/typescript/v2/AppInfo.ts | 3 + .../v2/AppListUpdatedNotification.ts | 9 + .../schema/typescript/v2/AppsListParams.ts | 9 +- .../schema/typescript/v2/AppsListResponse.ts | 3 + .../schema/typescript/v2/index.ts | 1 + .../src/protocol/common.rs | 1 + .../app-server-protocol/src/protocol/v2.rs | 14 + codex-rs/app-server/README.md | 27 +- .../app-server/src/codex_message_processor.rs | 202 ++++++++--- .../app-server/tests/suite/v2/app_list.rs | 319 +++++++++++++++++- codex-rs/chatgpt/src/connectors.rs | 86 ++++- codex-rs/core/src/connectors.rs | 109 +++++- codex-rs/tui/src/app.rs | 4 +- codex-rs/tui/src/app_event.rs | 5 +- .../tui/src/bottom_pane/bottom_pane_view.rs | 5 + codex-rs/tui/src/bottom_pane/chat_composer.rs | 38 +++ .../src/bottom_pane/list_selection_view.rs | 8 + codex-rs/tui/src/bottom_pane/mod.rs | 20 ++ codex-rs/tui/src/chatwidget.rs | 106 ++++-- codex-rs/tui/src/chatwidget/tests.rs | 69 ++++ 27 files changed, 1164 insertions(+), 87 deletions(-) create mode 100644 codex-rs/app-server-protocol/schema/json/v2/AppListUpdatedNotification.json create mode 100644 codex-rs/app-server-protocol/schema/typescript/v2/AppListUpdatedNotification.ts diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index aa6c86d87..1f19f8f55 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -21,6 +21,7 @@ "type": "object" }, "AppsListParams": { + "description": "EXPERIMENTAL - list available apps/connectors.", "properties": { "cursor": { "description": "Opaque pagination cursor returned by a previous call.", @@ -29,6 +30,10 @@ "null" ] }, + "forceRefetch": { + "description": "When true, bypass app caches and fetch the latest data from sources.", + "type": "boolean" + }, "limit": { "description": "Optional page size; defaults to a reasonable server-side value.", "format": "uint32", diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index c82db73cf..a2800e197 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -165,6 +165,71 @@ } ] }, + "AppInfo": { + "description": "EXPERIMENTAL - app metadata returned by app-list APIs.", + "properties": { + "description": { + "type": [ + "string", + "null" + ] + }, + "distributionChannel": { + "type": [ + "string", + "null" + ] + }, + "id": { + "type": "string" + }, + "installUrl": { + "type": [ + "string", + "null" + ] + }, + "isAccessible": { + "default": false, + "type": "boolean" + }, + "logoUrl": { + "type": [ + "string", + "null" + ] + }, + "logoUrlDark": { + "type": [ + "string", + "null" + ] + }, + "name": { + "type": "string" + } + }, + "required": [ + "id", + "name" + ], + "type": "object" + }, + "AppListUpdatedNotification": { + "description": "EXPERIMENTAL - notification emitted when the app list changes.", + "properties": { + "data": { + "items": { + "$ref": "#/definitions/AppInfo" + }, + "type": "array" + } + }, + "required": [ + "data" + ], + "type": "object" + }, "AskForApproval": { "description": "Determines the conditions under which the user is consulted to approve running the command proposed by Codex.", "oneOf": [ @@ -7886,6 +7951,26 @@ "title": "Account/rateLimits/updatedNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "app/list/updated" + ], + "title": "App/list/updatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/AppListUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "App/list/updatedNotification", + "type": "object" + }, { "properties": { "method": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index a4f998df8..6f36a9a89 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -8192,6 +8192,26 @@ "title": "Account/rateLimits/updatedNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "app/list/updated" + ], + "title": "App/list/updatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/AppListUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "App/list/updatedNotification", + "type": "object" + }, { "properties": { "method": { @@ -9965,6 +9985,7 @@ "type": "string" }, "AppInfo": { + "description": "EXPERIMENTAL - app metadata returned by app-list APIs.", "properties": { "description": { "type": [ @@ -10013,11 +10034,29 @@ ], "type": "object" }, + "AppListUpdatedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "description": "EXPERIMENTAL - notification emitted when the app list changes.", + "properties": { + "data": { + "items": { + "$ref": "#/definitions/v2/AppInfo" + }, + "type": "array" + } + }, + "required": [ + "data" + ], + "title": "AppListUpdatedNotification", + "type": "object" + }, "AppsConfig": { "type": "object" }, "AppsListParams": { "$schema": "http://json-schema.org/draft-07/schema#", + "description": "EXPERIMENTAL - list available apps/connectors.", "properties": { "cursor": { "description": "Opaque pagination cursor returned by a previous call.", @@ -10026,6 +10065,10 @@ "null" ] }, + "forceRefetch": { + "description": "When true, bypass app caches and fetch the latest data from sources.", + "type": "boolean" + }, "limit": { "description": "Optional page size; defaults to a reasonable server-side value.", "format": "uint32", @@ -10041,6 +10084,7 @@ }, "AppsListResponse": { "$schema": "http://json-schema.org/draft-07/schema#", + "description": "EXPERIMENTAL - app list response.", "properties": { "data": { "items": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/AppListUpdatedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/AppListUpdatedNotification.json new file mode 100644 index 000000000..cf75f10ce --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/AppListUpdatedNotification.json @@ -0,0 +1,69 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "AppInfo": { + "description": "EXPERIMENTAL - app metadata returned by app-list APIs.", + "properties": { + "description": { + "type": [ + "string", + "null" + ] + }, + "distributionChannel": { + "type": [ + "string", + "null" + ] + }, + "id": { + "type": "string" + }, + "installUrl": { + "type": [ + "string", + "null" + ] + }, + "isAccessible": { + "default": false, + "type": "boolean" + }, + "logoUrl": { + "type": [ + "string", + "null" + ] + }, + "logoUrlDark": { + "type": [ + "string", + "null" + ] + }, + "name": { + "type": "string" + } + }, + "required": [ + "id", + "name" + ], + "type": "object" + } + }, + "description": "EXPERIMENTAL - notification emitted when the app list changes.", + "properties": { + "data": { + "items": { + "$ref": "#/definitions/AppInfo" + }, + "type": "array" + } + }, + "required": [ + "data" + ], + "title": "AppListUpdatedNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/AppsListParams.json b/codex-rs/app-server-protocol/schema/json/v2/AppsListParams.json index 3625f7b30..0369fec38 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/AppsListParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/AppsListParams.json @@ -1,5 +1,6 @@ { "$schema": "http://json-schema.org/draft-07/schema#", + "description": "EXPERIMENTAL - list available apps/connectors.", "properties": { "cursor": { "description": "Opaque pagination cursor returned by a previous call.", @@ -8,6 +9,10 @@ "null" ] }, + "forceRefetch": { + "description": "When true, bypass app caches and fetch the latest data from sources.", + "type": "boolean" + }, "limit": { "description": "Optional page size; defaults to a reasonable server-side value.", "format": "uint32", diff --git a/codex-rs/app-server-protocol/schema/json/v2/AppsListResponse.json b/codex-rs/app-server-protocol/schema/json/v2/AppsListResponse.json index f5cac7077..d2b5cc5ef 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/AppsListResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/AppsListResponse.json @@ -2,6 +2,7 @@ "$schema": "http://json-schema.org/draft-07/schema#", "definitions": { "AppInfo": { + "description": "EXPERIMENTAL - app metadata returned by app-list APIs.", "properties": { "description": { "type": [ @@ -51,6 +52,7 @@ "type": "object" } }, + "description": "EXPERIMENTAL - app list response.", "properties": { "data": { "items": { diff --git a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts index 403617fcd..2e1b1e963 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts @@ -8,6 +8,7 @@ import type { AccountLoginCompletedNotification } from "./v2/AccountLoginComplet import type { AccountRateLimitsUpdatedNotification } from "./v2/AccountRateLimitsUpdatedNotification"; import type { AccountUpdatedNotification } from "./v2/AccountUpdatedNotification"; import type { AgentMessageDeltaNotification } from "./v2/AgentMessageDeltaNotification"; +import type { AppListUpdatedNotification } from "./v2/AppListUpdatedNotification"; import type { CommandExecutionOutputDeltaNotification } from "./v2/CommandExecutionOutputDeltaNotification"; import type { ConfigWarningNotification } from "./v2/ConfigWarningNotification"; import type { ContextCompactedNotification } from "./v2/ContextCompactedNotification"; @@ -36,4 +37,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW /** * Notification sent from the server to the client. */ -export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; +export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/AppInfo.ts b/codex-rs/app-server-protocol/schema/typescript/v2/AppInfo.ts index 6e959cc2e..0957c0dd4 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/AppInfo.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/AppInfo.ts @@ -2,4 +2,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +/** + * EXPERIMENTAL - app metadata returned by app-list APIs. + */ export type AppInfo = { id: string, name: string, description: string | null, logoUrl: string | null, logoUrlDark: string | null, distributionChannel: string | null, installUrl: string | null, isAccessible: boolean, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/AppListUpdatedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/AppListUpdatedNotification.ts new file mode 100644 index 000000000..c6ad87f2c --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/AppListUpdatedNotification.ts @@ -0,0 +1,9 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { AppInfo } from "./AppInfo"; + +/** + * EXPERIMENTAL - notification emitted when the app list changes. + */ +export type AppListUpdatedNotification = { data: Array, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/AppsListParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/AppsListParams.ts index a3e6fbf62..3bd769756 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/AppsListParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/AppsListParams.ts @@ -2,6 +2,9 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +/** + * EXPERIMENTAL - list available apps/connectors. + */ export type AppsListParams = { /** * Opaque pagination cursor returned by a previous call. @@ -10,4 +13,8 @@ cursor?: string | null, /** * Optional page size; defaults to a reasonable server-side value. */ -limit?: number | null, }; +limit?: number | null, +/** + * When true, bypass app caches and fetch the latest data from sources. + */ +forceRefetch?: boolean, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/AppsListResponse.ts b/codex-rs/app-server-protocol/schema/typescript/v2/AppsListResponse.ts index b6f5c653f..cb1e45f20 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/AppsListResponse.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/AppsListResponse.ts @@ -3,6 +3,9 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { AppInfo } from "./AppInfo"; +/** + * EXPERIMENTAL - app list response. + */ export type AppsListResponse = { data: Array, /** * Opaque cursor to pass to the next call to continue after the last item. diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index 3c6caf034..05e8322e4 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -8,6 +8,7 @@ export type { AgentMessageDeltaNotification } from "./AgentMessageDeltaNotificat export type { AnalyticsConfig } from "./AnalyticsConfig"; export type { AppDisabledReason } from "./AppDisabledReason"; export type { AppInfo } from "./AppInfo"; +export type { AppListUpdatedNotification } from "./AppListUpdatedNotification"; export type { AppsConfig } from "./AppsConfig"; export type { AppsListParams } from "./AppsListParams"; export type { AppsListResponse } from "./AppsListResponse"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 9e0a5f8d7..dc9d97bf9 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -721,6 +721,7 @@ server_notification_definitions! { McpServerOauthLoginCompleted => "mcpServer/oauthLogin/completed" (v2::McpServerOauthLoginCompletedNotification), AccountUpdated => "account/updated" (v2::AccountUpdatedNotification), AccountRateLimitsUpdated => "account/rateLimits/updated" (v2::AccountRateLimitsUpdatedNotification), + AppListUpdated => "app/list/updated" (v2::AppListUpdatedNotification), ReasoningSummaryTextDelta => "item/reasoning/summaryTextDelta" (v2::ReasoningSummaryTextDeltaNotification), ReasoningSummaryPartAdded => "item/reasoning/summaryPartAdded" (v2::ReasoningSummaryPartAddedNotification), ReasoningTextDelta => "item/reasoning/textDelta" (v2::ReasoningTextDeltaNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index fd0c33e8f..0d42f87f1 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1193,6 +1193,7 @@ pub struct ListMcpServerStatusResponse { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] +/// EXPERIMENTAL - list available apps/connectors. pub struct AppsListParams { /// Opaque pagination cursor returned by a previous call. #[ts(optional = nullable)] @@ -1200,11 +1201,15 @@ pub struct AppsListParams { /// Optional page size; defaults to a reasonable server-side value. #[ts(optional = nullable)] pub limit: Option, + /// When true, bypass app caches and fetch the latest data from sources. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub force_refetch: bool, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] +/// EXPERIMENTAL - app metadata returned by app-list APIs. pub struct AppInfo { pub id: String, pub name: String, @@ -1220,6 +1225,7 @@ pub struct AppInfo { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] +/// EXPERIMENTAL - app list response. pub struct AppsListResponse { pub data: Vec, /// Opaque cursor to pass to the next call to continue after the last item. @@ -1227,6 +1233,14 @@ pub struct AppsListResponse { pub next_cursor: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +/// EXPERIMENTAL - notification emitted when the app list changes. +pub struct AppListUpdatedNotification { + pub data: Vec, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 920761a17..368372f25 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -700,7 +700,8 @@ Use `app/list` to fetch available apps (connectors). Each entry includes metadat ```json { "method": "app/list", "id": 50, "params": { "cursor": null, - "limit": 50 + "limit": 50, + "forceRefetch": false } } { "id": 50, "result": { "data": [ @@ -719,6 +720,30 @@ Use `app/list` to fetch available apps (connectors). Each entry includes metadat } } ``` +`app/list` returns after both accessible apps and directory apps are loaded. Set `forceRefetch: true` to bypass app caches and fetch fresh data from sources. Cache entries are only replaced when those refetches succeed. + +The server also emits `app/list/updated` notifications whenever either source (accessible apps or directory apps) finishes loading. Each notification includes the latest merged app list. + +```json +{ + "method": "app/list/updated", + "params": { + "data": [ + { + "id": "demo-app", + "name": "Demo App", + "description": "Example connector for documentation.", + "logoUrl": "https://example.com/demo-app.png", + "logoUrlDark": null, + "distributionChannel": null, + "installUrl": "https://chatgpt.com/apps/demo-app/demo-app", + "isAccessible": true + } + ] + } +} +``` + Invoke an app by inserting `$` in the text input. The slug is derived from the app name and lowercased with non-alphanumeric characters replaced by `-` (for example, "Demo App" becomes `$demo-app`). Add a `mention` input item (recommended) so the server uses the exact `app://` path rather than guessing by name. Example: diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 3f65cf287..b9363b565 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -15,6 +15,8 @@ use codex_app_server_protocol::AccountLoginCompletedNotification; use codex_app_server_protocol::AccountUpdatedNotification; use codex_app_server_protocol::AddConversationListenerParams; use codex_app_server_protocol::AddConversationSubscriptionResponse; +use codex_app_server_protocol::AppInfo; +use codex_app_server_protocol::AppListUpdatedNotification; use codex_app_server_protocol::AppsListParams; use codex_app_server_protocol::AppsListResponse; use codex_app_server_protocol::ArchiveConversationParams; @@ -269,6 +271,7 @@ const THREAD_LIST_MAX_LIMIT: usize = 100; // Duration before a ChatGPT login attempt is abandoned. const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60); +const APP_LIST_LOAD_TIMEOUT: Duration = Duration::from_secs(90); struct ActiveLogin { shutdown_handle: ShutdownHandle, login_id: Uuid, @@ -279,6 +282,11 @@ enum CancelLoginError { NotFound(Uuid), } +enum AppListLoadResult { + Accessible(Result, String>), + Directory(Result, String>), +} + impl Drop for ActiveLogin { fn drop(&mut self) { self.shutdown_handle.shutdown(); @@ -4324,7 +4332,6 @@ impl CodexMessageProcessor { } async fn apps_list(&self, request_id: ConnectionRequestId, params: AppsListParams) { - let AppsListParams { cursor, limit } = params; let config = match self.load_latest_config().await { Ok(config) => config, Err(error) => { @@ -4346,65 +4353,182 @@ impl CodexMessageProcessor { return; } - let connectors = match connectors::list_connectors(&config).await { - Ok(connectors) => connectors, - Err(err) => { - self.send_internal_error(request_id, format!("failed to list apps: {err}")) - .await; - return; - } - }; + let request = request_id.clone(); + let outgoing = Arc::clone(&self.outgoing); + tokio::spawn(async move { + Self::apps_list_task(outgoing, request, params, config).await; + }); + } - let total = connectors.len(); - if total == 0 { - self.outgoing - .send_response( - request_id, - AppsListResponse { - data: Vec::new(), - next_cursor: None, - }, - ) - .await; - return; - } - - let effective_limit = limit.unwrap_or(total as u32).max(1) as usize; - let effective_limit = effective_limit.min(total); + async fn apps_list_task( + outgoing: Arc, + request_id: ConnectionRequestId, + params: AppsListParams, + config: Config, + ) { + let AppsListParams { + cursor, + limit, + force_refetch, + } = params; let start = match cursor { Some(cursor) => match cursor.parse::() { Ok(idx) => idx, Err(_) => { - self.send_invalid_request_error( - request_id, - format!("invalid cursor: {cursor}"), - ) - .await; + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("invalid cursor: {cursor}"), + data: None, + }; + outgoing.send_error(request_id, error).await; return; } }, None => 0, }; - if start > total { - self.send_invalid_request_error( - request_id, - format!("cursor {start} exceeds total apps {total}"), + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + let accessible_config = config.clone(); + let accessible_tx = tx.clone(); + tokio::spawn(async move { + let result = connectors::list_accessible_connectors_from_mcp_tools_with_options( + &accessible_config, + force_refetch, ) - .await; - return; + .await + .map_err(|err| format!("failed to load accessible apps: {err}")); + let _ = accessible_tx.send(AppListLoadResult::Accessible(result)); + }); + + tokio::spawn(async move { + let result = connectors::list_all_connectors_with_options(&config, force_refetch) + .await + .map_err(|err| format!("failed to list apps: {err}")); + let _ = tx.send(AppListLoadResult::Directory(result)); + }); + + let mut accessible_connectors: Option> = None; + let mut all_connectors: Option> = None; + let app_list_deadline = tokio::time::Instant::now() + APP_LIST_LOAD_TIMEOUT; + + loop { + let result = match tokio::time::timeout_at(app_list_deadline, rx.recv()).await { + Ok(Some(result)) => result, + Ok(None) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: "failed to load app lists".to_string(), + data: None, + }; + outgoing.send_error(request_id, error).await; + return; + } + Err(_) => { + let timeout_seconds = APP_LIST_LOAD_TIMEOUT.as_secs(); + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!( + "timed out waiting for app lists after {timeout_seconds} seconds" + ), + data: None, + }; + outgoing.send_error(request_id, error).await; + return; + } + }; + + match result { + AppListLoadResult::Accessible(Ok(connectors)) => { + accessible_connectors = Some(connectors); + } + AppListLoadResult::Accessible(Err(err)) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: err, + data: None, + }; + outgoing.send_error(request_id, error).await; + return; + } + AppListLoadResult::Directory(Ok(connectors)) => { + all_connectors = Some(connectors); + } + AppListLoadResult::Directory(Err(err)) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: err, + data: None, + }; + outgoing.send_error(request_id, error).await; + return; + } + } + + let merged = Self::merge_loaded_apps( + all_connectors.as_deref(), + accessible_connectors.as_deref(), + ); + Self::send_app_list_updated_notification(&outgoing, merged.clone()).await; + + if accessible_connectors.is_some() && all_connectors.is_some() { + match Self::paginate_apps(merged.as_slice(), start, limit) { + Ok(response) => { + outgoing.send_response(request_id, response).await; + return; + } + Err(error) => { + outgoing.send_error(request_id, error).await; + return; + } + } + } + } + } + + fn merge_loaded_apps( + all_connectors: Option<&[AppInfo]>, + accessible_connectors: Option<&[AppInfo]>, + ) -> Vec { + let all = all_connectors.map_or_else(Vec::new, <[AppInfo]>::to_vec); + let accessible = accessible_connectors.map_or_else(Vec::new, <[AppInfo]>::to_vec); + connectors::merge_connectors_with_accessible(all, accessible) + } + + fn paginate_apps( + connectors: &[AppInfo], + start: usize, + limit: Option, + ) -> Result { + let total = connectors.len(); + if start > total { + return Err(JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("cursor {start} exceeds total apps {total}"), + data: None, + }); } + let effective_limit = limit.unwrap_or(total as u32).max(1) as usize; let end = start.saturating_add(effective_limit).min(total); let data = connectors[start..end].to_vec(); - let next_cursor = if end < total { Some(end.to_string()) } else { None }; - self.outgoing - .send_response(request_id, AppsListResponse { data, next_cursor }) + + Ok(AppsListResponse { data, next_cursor }) + } + + async fn send_app_list_updated_notification( + outgoing: &Arc, + data: Vec, + ) { + outgoing + .send_server_notification(ServerNotification::AppListUpdated( + AppListUpdatedNotification { data }, + )) .await; } diff --git a/codex-rs/app-server/tests/suite/v2/app_list.rs b/codex-rs/app-server/tests/suite/v2/app_list.rs index c6b829f2e..0a5d90af2 100644 --- a/codex-rs/app-server/tests/suite/v2/app_list.rs +++ b/codex-rs/app-server/tests/suite/v2/app_list.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; +use anyhow::bail; use app_test_support::ChatGptAuthFixture; use app_test_support::McpProcess; use app_test_support::to_response; @@ -15,10 +16,13 @@ use axum::http::StatusCode; use axum::http::header::AUTHORIZATION; use axum::routing::get; use codex_app_server_protocol::AppInfo; +use codex_app_server_protocol::AppListUpdatedNotification; use codex_app_server_protocol::AppsListParams; use codex_app_server_protocol::AppsListResponse; +use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ServerNotification; use codex_core::auth::AuthCredentialsStoreMode; use pretty_assertions::assert_eq; use rmcp::handler::server::ServerHandler; @@ -51,6 +55,7 @@ async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> { .send_apps_list_request(AppsListParams { limit: Some(50), cursor: None, + force_refetch: false, }) .await?; @@ -67,6 +72,120 @@ async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> { Ok(()) } +#[tokio::test] +async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<()> { + let connectors = vec![ + AppInfo { + id: "alpha".to_string(), + name: "Alpha".to_string(), + description: Some("Alpha connector".to_string()), + logo_url: Some("https://example.com/alpha.png".to_string()), + logo_url_dark: None, + distribution_channel: None, + install_url: None, + is_accessible: false, + }, + AppInfo { + id: "beta".to_string(), + name: "beta".to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: None, + is_accessible: false, + }, + ]; + + let tools = vec![connector_tool("beta", "Beta App")?]; + let (server_url, server_handle) = start_apps_server_with_delays( + connectors.clone(), + tools, + Duration::from_millis(300), + Duration::ZERO, + ) + .await?; + + let codex_home = TempDir::new()?; + write_connectors_config(codex_home.path(), &server_url)?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_apps_list_request(AppsListParams { + limit: None, + cursor: None, + force_refetch: false, + }) + .await?; + + let expected_accessible = vec![AppInfo { + id: "beta".to_string(), + name: "Beta App".to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://chatgpt.com/apps/beta-app/beta".to_string()), + is_accessible: true, + }]; + + let first_update = read_app_list_updated_notification(&mut mcp).await?; + assert_eq!(first_update.data, expected_accessible); + + let expected_merged = vec![ + AppInfo { + id: "beta".to_string(), + name: "Beta App".to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()), + is_accessible: true, + }, + AppInfo { + id: "alpha".to_string(), + name: "Alpha".to_string(), + description: Some("Alpha connector".to_string()), + logo_url: Some("https://example.com/alpha.png".to_string()), + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()), + is_accessible: false, + }, + ]; + + let second_update = read_app_list_updated_notification(&mut mcp).await?; + assert_eq!(second_update.data, expected_merged); + + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + + let AppsListResponse { + data: response_data, + next_cursor, + } = to_response(response)?; + assert_eq!(response_data, expected_merged); + assert!(next_cursor.is_none()); + + server_handle.abort(); + let _ = server_handle.await; + Ok(()) +} + #[tokio::test] async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { let connectors = vec![ @@ -93,7 +212,13 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { ]; let tools = vec![connector_tool("beta", "Beta App")?]; - let (server_url, server_handle) = start_apps_server(connectors.clone(), tools).await?; + let (server_url, server_handle) = start_apps_server_with_delays( + connectors.clone(), + tools, + Duration::ZERO, + Duration::from_millis(300), + ) + .await?; let codex_home = TempDir::new()?; write_connectors_config(codex_home.path(), &server_url)?; @@ -113,16 +238,36 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { .send_apps_list_request(AppsListParams { limit: None, cursor: None, + force_refetch: false, }) .await?; - let response: JSONRPCResponse = timeout( - DEFAULT_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(request_id)), - ) - .await??; - - let AppsListResponse { data, next_cursor } = to_response(response)?; + let first_update = read_app_list_updated_notification(&mut mcp).await?; + assert_eq!( + first_update.data, + vec![ + AppInfo { + id: "alpha".to_string(), + name: "Alpha".to_string(), + description: Some("Alpha connector".to_string()), + logo_url: Some("https://example.com/alpha.png".to_string()), + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()), + is_accessible: false, + }, + AppInfo { + id: "beta".to_string(), + name: "beta".to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()), + is_accessible: false, + }, + ] + ); let expected = vec![ AppInfo { @@ -147,6 +292,15 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { }, ]; + let second_update = read_app_list_updated_notification(&mut mcp).await?; + assert_eq!(second_update.data, expected); + + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let AppsListResponse { data, next_cursor } = to_response(response)?; assert_eq!(data, expected); assert!(next_cursor.is_none()); @@ -180,7 +334,13 @@ async fn list_apps_paginates_results() -> Result<()> { ]; let tools = vec![connector_tool("beta", "Beta App")?]; - let (server_url, server_handle) = start_apps_server(connectors.clone(), tools).await?; + let (server_url, server_handle) = start_apps_server_with_delays( + connectors.clone(), + tools, + Duration::ZERO, + Duration::from_millis(300), + ) + .await?; let codex_home = TempDir::new()?; write_connectors_config(codex_home.path(), &server_url)?; @@ -200,6 +360,7 @@ async fn list_apps_paginates_results() -> Result<()> { .send_apps_list_request(AppsListParams { limit: Some(1), cursor: None, + force_refetch: false, }) .await?; let first_response: JSONRPCResponse = timeout( @@ -226,10 +387,18 @@ async fn list_apps_paginates_results() -> Result<()> { assert_eq!(first_page, expected_first); let next_cursor = first_cursor.ok_or_else(|| anyhow::anyhow!("missing cursor"))?; + loop { + let update = read_app_list_updated_notification(&mut mcp).await?; + if update.data.len() == 2 && update.data.iter().any(|connector| connector.is_accessible) { + break; + } + } + let second_request = mcp .send_apps_list_request(AppsListParams { limit: Some(1), cursor: Some(next_cursor), + force_refetch: false, }) .await?; let second_response: JSONRPCResponse = timeout( @@ -260,21 +429,134 @@ async fn list_apps_paginates_results() -> Result<()> { Ok(()) } +#[tokio::test] +async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result<()> { + let connectors = vec![AppInfo { + id: "beta".to_string(), + name: "Beta App".to_string(), + description: Some("Beta connector".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: None, + is_accessible: false, + }]; + let tools = vec![connector_tool("beta", "Beta App")?]; + let (server_url, server_handle) = + start_apps_server_with_delays(connectors, tools, Duration::ZERO, Duration::ZERO).await?; + + let codex_home = TempDir::new()?; + write_connectors_config(codex_home.path(), &server_url)?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let initial_request = mcp + .send_apps_list_request(AppsListParams { + limit: None, + cursor: None, + force_refetch: false, + }) + .await?; + let initial_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(initial_request)), + ) + .await??; + let AppsListResponse { + data: initial_data, + next_cursor: initial_next_cursor, + } = to_response(initial_response)?; + assert!(initial_next_cursor.is_none()); + assert_eq!(initial_data.len(), 1); + assert!(initial_data.iter().all(|app| app.is_accessible)); + + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token-invalid") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let refetch_request = mcp + .send_apps_list_request(AppsListParams { + limit: None, + cursor: None, + force_refetch: true, + }) + .await?; + let refetch_error: JSONRPCError = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(refetch_request)), + ) + .await??; + assert!(refetch_error.error.message.contains("failed to")); + + let cached_request = mcp + .send_apps_list_request(AppsListParams { + limit: None, + cursor: None, + force_refetch: false, + }) + .await?; + let cached_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(cached_request)), + ) + .await??; + let AppsListResponse { + data: cached_data, + next_cursor: cached_next_cursor, + } = to_response(cached_response)?; + + assert_eq!(cached_data, initial_data); + assert!(cached_next_cursor.is_none()); + server_handle.abort(); + Ok(()) +} + +async fn read_app_list_updated_notification( + mcp: &mut McpProcess, +) -> Result { + let notification = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_notification_message("app/list/updated"), + ) + .await??; + let parsed: ServerNotification = notification.try_into()?; + let ServerNotification::AppListUpdated(payload) = parsed else { + bail!("unexpected notification variant"); + }; + Ok(payload) +} + #[derive(Clone)] struct AppsServerState { expected_bearer: String, expected_account_id: String, response: serde_json::Value, + directory_delay: Duration, } #[derive(Clone)] struct AppListMcpServer { tools: Arc>, + tools_delay: Duration, } impl AppListMcpServer { - fn new(tools: Arc>) -> Self { - Self { tools } + fn new(tools: Arc>, tools_delay: Duration) -> Self { + Self { tools, tools_delay } } } @@ -293,7 +575,11 @@ impl ServerHandler for AppListMcpServer { ) -> impl std::future::Future> + Send + '_ { let tools = self.tools.clone(); + let tools_delay = self.tools_delay; async move { + if tools_delay > Duration::ZERO { + tokio::time::sleep(tools_delay).await; + } Ok(ListToolsResult { tools: (*tools).clone(), next_cursor: None, @@ -303,14 +589,17 @@ impl ServerHandler for AppListMcpServer { } } -async fn start_apps_server( +async fn start_apps_server_with_delays( connectors: Vec, tools: Vec, + directory_delay: Duration, + tools_delay: Duration, ) -> Result<(String, JoinHandle<()>)> { let state = AppsServerState { expected_bearer: "Bearer chatgpt-token".to_string(), expected_account_id: "account-123".to_string(), response: json!({ "apps": connectors, "next_token": null }), + directory_delay, }; let state = Arc::new(state); let tools = Arc::new(tools); @@ -321,7 +610,7 @@ async fn start_apps_server( let mcp_service = StreamableHttpService::new( { let tools = tools.clone(); - move || Ok(AppListMcpServer::new(tools.clone())) + move || Ok(AppListMcpServer::new(tools.clone(), tools_delay)) }, Arc::new(LocalSessionManager::default()), StreamableHttpServerConfig::default(), @@ -347,6 +636,10 @@ async fn list_directory_connectors( State(state): State>, headers: HeaderMap, ) -> Result { + if state.directory_delay > Duration::ZERO { + tokio::time::sleep(state.directory_delay).await; + } + let bearer_ok = headers .get(AUTHORIZATION) .and_then(|value| value.to_str().ok()) diff --git a/codex-rs/chatgpt/src/connectors.rs b/codex-rs/chatgpt/src/connectors.rs index 83b727158..396286efd 100644 --- a/codex-rs/chatgpt/src/connectors.rs +++ b/codex-rs/chatgpt/src/connectors.rs @@ -1,18 +1,24 @@ use std::collections::HashMap; +use std::sync::LazyLock; +use std::sync::Mutex as StdMutex; use codex_core::config::Config; use codex_core::features::Feature; +use codex_core::token_data::TokenData; use serde::Deserialize; use std::time::Duration; +use std::time::Instant; use crate::chatgpt_client::chatgpt_get_request_with_timeout; use crate::chatgpt_token::get_chatgpt_token_data; use crate::chatgpt_token::init_chatgpt_token_from_auth; pub use codex_core::connectors::AppInfo; +use codex_core::connectors::CONNECTORS_CACHE_TTL; pub use codex_core::connectors::connector_display_label; use codex_core::connectors::connector_install_url; pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools; +pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools_with_options; use codex_core::connectors::merge_connectors; #[derive(Debug, Deserialize)] @@ -38,6 +44,24 @@ struct DirectoryApp { const DIRECTORY_CONNECTORS_TIMEOUT: Duration = Duration::from_secs(60); +#[derive(Clone, PartialEq, Eq)] +struct AllConnectorsCacheKey { + chatgpt_base_url: String, + account_id: Option, + chatgpt_user_id: Option, + is_workspace_account: bool, +} + +#[derive(Clone)] +struct CachedAllConnectors { + key: AllConnectorsCacheKey, + expires_at: Instant, + connectors: Vec, +} + +static ALL_CONNECTORS_CACHE: LazyLock>> = + LazyLock::new(|| StdMutex::new(None)); + pub async fn list_connectors(config: &Config) -> anyhow::Result> { if !config.features.enabled(Feature::Apps) { return Ok(Vec::new()); @@ -48,11 +72,17 @@ pub async fn list_connectors(config: &Config) -> anyhow::Result> { ); let connectors = connectors_result?; let accessible = accessible_result?; - let merged = merge_connectors(connectors, accessible); - Ok(filter_disallowed_connectors(merged)) + Ok(merge_connectors_with_accessible(connectors, accessible)) } pub async fn list_all_connectors(config: &Config) -> anyhow::Result> { + list_all_connectors_with_options(config, false).await +} + +pub async fn list_all_connectors_with_options( + config: &Config, + force_refetch: bool, +) -> anyhow::Result> { if !config.features.enabled(Feature::Apps) { return Ok(Vec::new()); } @@ -61,6 +91,11 @@ pub async fn list_all_connectors(config: &Config) -> anyhow::Result let token_data = get_chatgpt_token_data().ok_or_else(|| anyhow::anyhow!("ChatGPT token not available"))?; + let cache_key = all_connectors_cache_key(config, &token_data); + if !force_refetch && let Some(cached_connectors) = read_cached_all_connectors(&cache_key) { + return Ok(cached_connectors); + } + let mut apps = list_directory_connectors(config).await?; if token_data.id_token.is_workspace_account() { apps.extend(list_workspace_connectors(config).await?); @@ -84,9 +119,56 @@ pub async fn list_all_connectors(config: &Config) -> anyhow::Result .cmp(&right.name) .then_with(|| left.id.cmp(&right.id)) }); + write_cached_all_connectors(cache_key, &connectors); Ok(connectors) } +fn all_connectors_cache_key(config: &Config, token_data: &TokenData) -> AllConnectorsCacheKey { + AllConnectorsCacheKey { + chatgpt_base_url: config.chatgpt_base_url.clone(), + account_id: token_data.account_id.clone(), + chatgpt_user_id: token_data.id_token.chatgpt_user_id.clone(), + is_workspace_account: token_data.id_token.is_workspace_account(), + } +} + +fn read_cached_all_connectors(cache_key: &AllConnectorsCacheKey) -> Option> { + let mut cache_guard = ALL_CONNECTORS_CACHE + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let now = Instant::now(); + + if let Some(cached) = cache_guard.as_ref() { + if now < cached.expires_at && cached.key == *cache_key { + return Some(cached.connectors.clone()); + } + if now >= cached.expires_at { + *cache_guard = None; + } + } + + None +} + +fn write_cached_all_connectors(cache_key: AllConnectorsCacheKey, connectors: &[AppInfo]) { + let mut cache_guard = ALL_CONNECTORS_CACHE + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *cache_guard = Some(CachedAllConnectors { + key: cache_key, + expires_at: Instant::now() + CONNECTORS_CACHE_TTL, + connectors: connectors.to_vec(), + }); +} + +pub fn merge_connectors_with_accessible( + connectors: Vec, + accessible_connectors: Vec, +) -> Vec { + let merged = merge_connectors(connectors, accessible_connectors); + filter_disallowed_connectors(merged) +} + async fn list_directory_connectors(config: &Config) -> anyhow::Result> { let mut apps = Vec::new(); let mut next_token: Option = None; diff --git a/codex-rs/core/src/connectors.rs b/codex-rs/core/src/connectors.rs index afe03fbbe..137b2c005 100644 --- a/codex-rs/core/src/connectors.rs +++ b/codex-rs/core/src/connectors.rs @@ -1,6 +1,10 @@ use std::collections::HashMap; use std::env; use std::path::PathBuf; +use std::sync::LazyLock; +use std::sync::Mutex as StdMutex; +use std::time::Duration; +use std::time::Instant; use async_channel::unbounded; pub use codex_app_server_protocol::AppInfo; @@ -8,6 +12,7 @@ use codex_protocol::protocol::SandboxPolicy; use tokio_util::sync::CancellationToken; use crate::AuthManager; +use crate::CodexAuth; use crate::SandboxState; use crate::config::Config; use crate::features::Feature; @@ -16,9 +21,37 @@ use crate::mcp::auth::compute_auth_statuses; use crate::mcp::with_codex_apps_mcp; use crate::mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT; use crate::mcp_connection_manager::McpConnectionManager; +use crate::token_data::TokenData; + +pub const CONNECTORS_CACHE_TTL: Duration = Duration::from_secs(3600); + +#[derive(Clone, PartialEq, Eq)] +struct AccessibleConnectorsCacheKey { + chatgpt_base_url: String, + account_id: Option, + chatgpt_user_id: Option, + is_workspace_account: bool, +} + +#[derive(Clone)] +struct CachedAccessibleConnectors { + key: AccessibleConnectorsCacheKey, + expires_at: Instant, + connectors: Vec, +} + +static ACCESSIBLE_CONNECTORS_CACHE: LazyLock>> = + LazyLock::new(|| StdMutex::new(None)); pub async fn list_accessible_connectors_from_mcp_tools( config: &Config, +) -> anyhow::Result> { + list_accessible_connectors_from_mcp_tools_with_options(config, false).await +} + +pub async fn list_accessible_connectors_from_mcp_tools_with_options( + config: &Config, + force_refetch: bool, ) -> anyhow::Result> { if !config.features.enabled(Feature::Apps) { return Ok(Vec::new()); @@ -26,6 +59,12 @@ pub async fn list_accessible_connectors_from_mcp_tools( let auth_manager = auth_manager_from_config(config); let auth = auth_manager.auth().await; + let cache_key = accessible_connectors_cache_key(config, auth.as_ref()); + if !force_refetch && let Some(cached_connectors) = read_cached_accessible_connectors(&cache_key) + { + return Ok(cached_connectors); + } + let mcp_servers = with_codex_apps_mcp(HashMap::new(), true, auth.as_ref(), config); if mcp_servers.is_empty() { return Ok(Vec::new()); @@ -57,17 +96,79 @@ pub async fn list_accessible_connectors_from_mcp_tools( ) .await; - if let Some(cfg) = mcp_servers.get(CODEX_APPS_MCP_SERVER_NAME) { + let codex_apps_ready = if let Some(cfg) = mcp_servers.get(CODEX_APPS_MCP_SERVER_NAME) { let timeout = cfg.startup_timeout_sec.unwrap_or(DEFAULT_STARTUP_TIMEOUT); mcp_connection_manager .wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, timeout) - .await; - } + .await + } else { + false + }; let tools = mcp_connection_manager.list_all_tools().await; cancel_token.cancel(); - Ok(accessible_connectors_from_mcp_tools(&tools)) + let accessible_connectors = accessible_connectors_from_mcp_tools(&tools); + if codex_apps_ready || !accessible_connectors.is_empty() { + write_cached_accessible_connectors(cache_key, &accessible_connectors); + } + Ok(accessible_connectors) +} + +fn accessible_connectors_cache_key( + config: &Config, + auth: Option<&CodexAuth>, +) -> AccessibleConnectorsCacheKey { + let token_data: Option = auth.and_then(|auth| auth.get_token_data().ok()); + let account_id = token_data + .as_ref() + .and_then(|token_data| token_data.account_id.clone()); + let chatgpt_user_id = token_data + .as_ref() + .and_then(|token_data| token_data.id_token.chatgpt_user_id.clone()); + let is_workspace_account = token_data + .as_ref() + .is_some_and(|token_data| token_data.id_token.is_workspace_account()); + AccessibleConnectorsCacheKey { + chatgpt_base_url: config.chatgpt_base_url.clone(), + account_id, + chatgpt_user_id, + is_workspace_account, + } +} + +fn read_cached_accessible_connectors( + cache_key: &AccessibleConnectorsCacheKey, +) -> Option> { + let mut cache_guard = ACCESSIBLE_CONNECTORS_CACHE + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let now = Instant::now(); + + if let Some(cached) = cache_guard.as_ref() { + if now < cached.expires_at && cached.key == *cache_key { + return Some(cached.connectors.clone()); + } + if now >= cached.expires_at { + *cache_guard = None; + } + } + + None +} + +fn write_cached_accessible_connectors( + cache_key: AccessibleConnectorsCacheKey, + connectors: &[AppInfo], +) { + let mut cache_guard = ACCESSIBLE_CONNECTORS_CACHE + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *cache_guard = Some(CachedAccessibleConnectors { + key: cache_key, + expires_at: Instant::now() + CONNECTORS_CACHE_TTL, + connectors: connectors.to_vec(), + }); } fn auth_manager_from_config(config: &Config) -> std::sync::Arc { diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 4d40c0175..2162e0ff1 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -1603,8 +1603,8 @@ impl App { AppEvent::RateLimitSnapshotFetched(snapshot) => { self.chat_widget.on_rate_limit_snapshot(Some(snapshot)); } - AppEvent::ConnectorsLoaded(result) => { - self.chat_widget.on_connectors_loaded(result); + AppEvent::ConnectorsLoaded { result, is_final } => { + self.chat_widget.on_connectors_loaded(result, is_final); } AppEvent::UpdateReasoningEffort(effort) => { self.on_update_reasoning_effort(effort); diff --git a/codex-rs/tui/src/app_event.rs b/codex-rs/tui/src/app_event.rs index bd48e5de1..b48960bbf 100644 --- a/codex-rs/tui/src/app_event.rs +++ b/codex-rs/tui/src/app_event.rs @@ -97,7 +97,10 @@ pub(crate) enum AppEvent { RateLimitSnapshotFetched(RateLimitSnapshot), /// Result of prefetching connectors. - ConnectorsLoaded(Result), + ConnectorsLoaded { + result: Result, + is_final: bool, + }, /// Result of computing a `/diff` command. DiffResult(String), diff --git a/codex-rs/tui/src/bottom_pane/bottom_pane_view.rs b/codex-rs/tui/src/bottom_pane/bottom_pane_view.rs index 039a9ca05..9c40adc69 100644 --- a/codex-rs/tui/src/bottom_pane/bottom_pane_view.rs +++ b/codex-rs/tui/src/bottom_pane/bottom_pane_view.rs @@ -16,6 +16,11 @@ pub(crate) trait BottomPaneView: Renderable { false } + /// Stable identifier for views that need external refreshes while open. + fn view_id(&self) -> Option<&'static str> { + None + } + /// Handle Ctrl-C while this view is active. fn on_ctrl_c(&mut self) -> CancellationEvent { CancellationEvent::NotHandled diff --git a/codex-rs/tui/src/bottom_pane/chat_composer.rs b/codex-rs/tui/src/bottom_pane/chat_composer.rs index 60fea9b08..58b6b5611 100644 --- a/codex-rs/tui/src/bottom_pane/chat_composer.rs +++ b/codex-rs/tui/src/bottom_pane/chat_composer.rs @@ -421,6 +421,7 @@ impl ChatComposer { pub fn set_connector_mentions(&mut self, connectors_snapshot: Option) { self.connectors_snapshot = connectors_snapshot; + self.sync_popups(); } pub(crate) fn take_mention_bindings(&mut self) -> Vec { @@ -4269,6 +4270,43 @@ mod tests { assert_ne!(composer.footer_mode, FooterMode::ShortcutOverlay); } + #[test] + fn set_connector_mentions_refreshes_open_mention_popup() { + let (tx, _rx) = unbounded_channel::(); + let sender = AppEventSender::new(tx); + let mut composer = ChatComposer::new( + true, + sender, + false, + "Ask Codex to do anything".to_string(), + false, + ); + composer.set_connectors_enabled(true); + composer.set_text_content("$".to_string(), Vec::new(), Vec::new()); + assert!(matches!(composer.active_popup, ActivePopup::None)); + + let connectors = vec![AppInfo { + id: "connector_1".to_string(), + name: "Notion".to_string(), + description: Some("Workspace docs".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://example.test/notion".to_string()), + is_accessible: true, + }]; + composer.set_connector_mentions(Some(ConnectorsSnapshot { connectors })); + + let ActivePopup::Skill(popup) = &composer.active_popup else { + panic!("expected mention popup to open after connectors update"); + }; + let mention = popup + .selected_mention() + .expect("expected connector mention to be selected"); + assert_eq!(mention.insert_text, "$notion".to_string()); + assert_eq!(mention.path, Some("app://connector_1".to_string())); + } + #[test] fn shortcut_overlay_persists_while_task_running() { use crossterm::event::KeyCode; diff --git a/codex-rs/tui/src/bottom_pane/list_selection_view.rs b/codex-rs/tui/src/bottom_pane/list_selection_view.rs index 3cf74e3a1..8f20f2bc6 100644 --- a/codex-rs/tui/src/bottom_pane/list_selection_view.rs +++ b/codex-rs/tui/src/bottom_pane/list_selection_view.rs @@ -68,6 +68,7 @@ pub(crate) struct SelectionItem { /// `AutoAllRows` measures all rows to ensure stable column widths as the user scrolls /// `Fixed` used a fixed 30/70 split between columns pub(crate) struct SelectionViewParams { + pub view_id: Option<&'static str>, pub title: Option, pub subtitle: Option, pub footer_note: Option>, @@ -83,6 +84,7 @@ pub(crate) struct SelectionViewParams { impl Default for SelectionViewParams { fn default() -> Self { Self { + view_id: None, title: None, subtitle: None, footer_note: None, @@ -103,6 +105,7 @@ impl Default for SelectionViewParams { /// visible rows and source items and for preserving selection while filters /// change. pub(crate) struct ListSelectionView { + view_id: Option<&'static str>, footer_note: Option>, footer_hint: Option>, items: Vec, @@ -139,6 +142,7 @@ impl ListSelectionView { ])); } let mut s = Self { + view_id: params.view_id, footer_note: params.footer_note, footer_hint: params.footer_hint, items: params.items, @@ -460,6 +464,10 @@ impl BottomPaneView for ListSelectionView { self.complete } + fn view_id(&self) -> Option<&'static str> { + self.view_id + } + fn on_ctrl_c(&mut self) -> CancellationEvent { self.complete = true; CancellationEvent::Handled diff --git a/codex-rs/tui/src/bottom_pane/mod.rs b/codex-rs/tui/src/bottom_pane/mod.rs index 960e67fed..226e91abb 100644 --- a/codex-rs/tui/src/bottom_pane/mod.rs +++ b/codex-rs/tui/src/bottom_pane/mod.rs @@ -658,6 +658,26 @@ impl BottomPane { self.push_view(Box::new(view)); } + /// Replace the active selection view when it matches `view_id`. + pub(crate) fn replace_selection_view_if_active( + &mut self, + view_id: &'static str, + params: list_selection_view::SelectionViewParams, + ) -> bool { + let is_match = self + .view_stack + .last() + .is_some_and(|view| view.view_id() == Some(view_id)); + if !is_match { + return false; + } + + self.view_stack.pop(); + let view = list_selection_view::ListSelectionView::new(params, self.app_event_tx.clone()); + self.push_view(Box::new(view)); + true + } + /// Update the queued messages preview shown above the composer. pub(crate) fn set_queued_user_messages(&mut self, queued: Vec) { self.queued_user_messages.messages = queued; diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 9746a0974..0f98e0f49 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -145,12 +145,14 @@ use ratatui::widgets::Wrap; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use tracing::debug; +use tracing::warn; const DEFAULT_MODEL_DISPLAY_NAME: &str = "loading"; const PLAN_IMPLEMENTATION_TITLE: &str = "Implement this plan?"; const PLAN_IMPLEMENTATION_YES: &str = "Yes, implement this plan"; const PLAN_IMPLEMENTATION_NO: &str = "No, stay in Plan mode"; const PLAN_IMPLEMENTATION_CODING_MESSAGE: &str = "Implement the plan."; +const CONNECTORS_SELECTION_VIEW_ID: &str = "connectors-selection"; use crate::app_event::AppEvent; use crate::app_event::ConnectorsSnapshot; @@ -539,6 +541,7 @@ pub(crate) struct ChatWidget { /// currently executing. mcp_startup_status: Option>, connectors_cache: ConnectorsCacheState, + connectors_prefetch_in_flight: bool, // Queue of interruptive UI events deferred during an active write cycle interrupts: InterruptManager, // Accumulates the current reasoning block text to extract a header @@ -1013,7 +1016,6 @@ impl ChatWidget { self.bottom_pane .set_history_metadata(event.history_log_id, event.history_entry_count); self.set_skills(None); - self.bottom_pane.set_connectors_snapshot(None); self.thread_id = Some(event.session_id); self.thread_name = event.thread_name.clone(); self.forked_from = event.forked_from_id; @@ -2619,6 +2621,7 @@ impl ChatWidget { agent_turn_running: false, mcp_startup_status: None, connectors_cache: ConnectorsCacheState::default(), + connectors_prefetch_in_flight: false, interrupts: InterruptManager::new(), reasoning_buffer: String::new(), full_reasoning_buffer: String::new(), @@ -2782,6 +2785,7 @@ impl ChatWidget { agent_turn_running: false, mcp_startup_status: None, connectors_cache: ConnectorsCacheState::default(), + connectors_prefetch_in_flight: false, interrupts: InterruptManager::new(), reasoning_buffer: String::new(), full_reasoning_buffer: String::new(), @@ -2934,6 +2938,7 @@ impl ChatWidget { agent_turn_running: false, mcp_startup_status: None, connectors_cache: ConnectorsCacheState::default(), + connectors_prefetch_in_flight: false, interrupts: InterruptManager::new(), reasoning_buffer: String::new(), full_reasoning_buffer: String::new(), @@ -4470,24 +4475,52 @@ impl ChatWidget { } fn prefetch_connectors(&mut self) { - if !self.connectors_enabled() { - return; - } - if matches!(self.connectors_cache, ConnectorsCacheState::Loading) { + if !self.connectors_enabled() || self.connectors_prefetch_in_flight { return; } - self.connectors_cache = ConnectorsCacheState::Loading; + self.connectors_prefetch_in_flight = true; + if !matches!(self.connectors_cache, ConnectorsCacheState::Ready(_)) { + self.connectors_cache = ConnectorsCacheState::Loading; + } + let config = self.config.clone(); let app_event_tx = self.app_event_tx.clone(); tokio::spawn(async move { - let result: Result = async { - let connectors = connectors::list_connectors(&config).await?; + let accessible_connectors = + match connectors::list_accessible_connectors_from_mcp_tools(&config).await { + Ok(connectors) => connectors, + Err(err) => { + app_event_tx.send(AppEvent::ConnectorsLoaded { + result: Err(format!("Failed to load apps: {err}")), + is_final: true, + }); + return; + } + }; + + app_event_tx.send(AppEvent::ConnectorsLoaded { + result: Ok(ConnectorsSnapshot { + connectors: accessible_connectors.clone(), + }), + is_final: false, + }); + + let result: Result = async { + let all_connectors = connectors::list_all_connectors(&config).await?; + let connectors = connectors::merge_connectors_with_accessible( + all_connectors, + accessible_connectors, + ); Ok(ConnectorsSnapshot { connectors }) } - .await; - let result = result.map_err(|err| format!("Failed to load apps: {err}")); - app_event_tx.send(AppEvent::ConnectorsLoaded(result)); + .await + .map_err(|err: anyhow::Error| format!("Failed to load apps: {err}")); + + app_event_tx.send(AppEvent::ConnectorsLoaded { + result, + is_final: true, + }); }); } @@ -6345,6 +6378,11 @@ impl ChatWidget { } fn open_connectors_popup(&mut self, connectors: &[connectors::AppInfo]) { + self.bottom_pane + .show_selection_view(self.connectors_popup_params(connectors)); + } + + fn connectors_popup_params(&self, connectors: &[connectors::AppInfo]) -> SelectionViewParams { let total = connectors.len(); let installed = connectors .iter() @@ -6412,7 +6450,8 @@ impl ChatWidget { items.push(item); } - self.bottom_pane.show_selection_view(SelectionViewParams { + SelectionViewParams { + view_id: Some(CONNECTORS_SELECTION_VIEW_ID), header: Box::new(header), footer_hint: Some(Self::connectors_popup_hint_line()), items, @@ -6420,7 +6459,14 @@ impl ChatWidget { search_placeholder: Some("Type to search apps".to_string()), col_width_mode: ColumnWidthMode::AutoAllRows, ..Default::default() - }); + } + } + + fn refresh_connectors_popup_if_open(&mut self, connectors: &[connectors::AppInfo]) { + let _ = self.bottom_pane.replace_selection_view_if_active( + CONNECTORS_SELECTION_VIEW_ID, + self.connectors_popup_params(connectors), + ); } fn connectors_popup_hint_line() -> Line<'static> { @@ -6659,16 +6705,30 @@ impl ChatWidget { self.set_skills_from_response(&ev); } - pub(crate) fn on_connectors_loaded(&mut self, result: Result) { - self.connectors_cache = match result { - Ok(connectors) => ConnectorsCacheState::Ready(connectors), - Err(err) => ConnectorsCacheState::Failed(err), - }; - if let ConnectorsCacheState::Ready(snapshot) = &self.connectors_cache { - self.bottom_pane - .set_connectors_snapshot(Some(snapshot.clone())); - } else { - self.bottom_pane.set_connectors_snapshot(None); + pub(crate) fn on_connectors_loaded( + &mut self, + result: Result, + is_final: bool, + ) { + if is_final { + self.connectors_prefetch_in_flight = false; + } + + match result { + Ok(snapshot) => { + self.refresh_connectors_popup_if_open(&snapshot.connectors); + self.connectors_cache = ConnectorsCacheState::Ready(snapshot.clone()); + self.bottom_pane.set_connectors_snapshot(Some(snapshot)); + } + Err(err) => { + if matches!(self.connectors_cache, ConnectorsCacheState::Ready(_)) { + warn!("failed to refresh apps list; retaining current apps snapshot: {err}"); + return; + } + + self.connectors_cache = ConnectorsCacheState::Failed(err); + self.bottom_pane.set_connectors_snapshot(None); + } } } diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index bf173ceca..43d2ddcb8 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -1074,6 +1074,7 @@ async fn make_chatwidget_manual( agent_turn_running: false, mcp_startup_status: None, connectors_cache: ConnectorsCacheState::default(), + connectors_prefetch_in_flight: false, interrupts: InterruptManager::new(), reasoning_buffer: String::new(), full_reasoning_buffer: String::new(), @@ -3667,6 +3668,74 @@ fn render_bottom_popup(chat: &ChatWidget, width: u16) -> String { lines.join("\n") } +#[tokio::test] +async fn apps_popup_refreshes_when_connectors_snapshot_updates() { + let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await; + chat.config.features.enable(Feature::Apps); + chat.bottom_pane.set_connectors_enabled(true); + + chat.on_connectors_loaded( + Ok(ConnectorsSnapshot { + connectors: vec![codex_chatgpt::connectors::AppInfo { + id: "connector_1".to_string(), + name: "Notion".to_string(), + description: Some("Workspace docs".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://example.test/notion".to_string()), + is_accessible: true, + }], + }), + false, + ); + chat.add_connectors_output(); + + let before = render_bottom_popup(&chat, 80); + assert!( + before.contains("Installed 1 of 1 available apps."), + "expected initial apps popup snapshot, got:\n{before}" + ); + + chat.on_connectors_loaded( + Ok(ConnectorsSnapshot { + connectors: vec![ + codex_chatgpt::connectors::AppInfo { + id: "connector_1".to_string(), + name: "Notion".to_string(), + description: Some("Workspace docs".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://example.test/notion".to_string()), + is_accessible: true, + }, + codex_chatgpt::connectors::AppInfo { + id: "connector_2".to_string(), + name: "Linear".to_string(), + description: Some("Project tracking".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://example.test/linear".to_string()), + is_accessible: true, + }, + ], + }), + true, + ); + + let after = render_bottom_popup(&chat, 80); + assert!( + after.contains("Installed 2 of 2 available apps."), + "expected refreshed apps popup snapshot, got:\n{after}" + ); + assert!( + after.contains("Linear"), + "expected refreshed popup to include new connector, got:\n{after}" + ); +} + #[tokio::test] async fn experimental_features_popup_snapshot() { let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;