diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index aa6c86d87..1f19f8f55 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -21,6 +21,7 @@ "type": "object" }, "AppsListParams": { + "description": "EXPERIMENTAL - list available apps/connectors.", "properties": { "cursor": { "description": "Opaque pagination cursor returned by a previous call.", @@ -29,6 +30,10 @@ "null" ] }, + "forceRefetch": { + "description": "When true, bypass app caches and fetch the latest data from sources.", + "type": "boolean" + }, "limit": { "description": "Optional page size; defaults to a reasonable server-side value.", "format": "uint32", diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index c82db73cf..a2800e197 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -165,6 +165,71 @@ } ] }, + "AppInfo": { + "description": "EXPERIMENTAL - app metadata returned by app-list APIs.", + "properties": { + "description": { + "type": [ + "string", + "null" + ] + }, + "distributionChannel": { + "type": [ + "string", + "null" + ] + }, + "id": { + "type": "string" + }, + "installUrl": { + "type": [ + "string", + "null" + ] + }, + "isAccessible": { + "default": false, + "type": "boolean" + }, + "logoUrl": { + "type": [ + "string", + "null" + ] + }, + "logoUrlDark": { + "type": [ + "string", + "null" + ] + }, + "name": { + "type": "string" + } + }, + "required": [ + "id", + "name" + ], + "type": "object" + }, + "AppListUpdatedNotification": { + "description": "EXPERIMENTAL - notification emitted when the app list changes.", + "properties": { + "data": { + "items": { + "$ref": "#/definitions/AppInfo" + }, + "type": "array" + } + }, + "required": [ + "data" + ], + "type": "object" + }, "AskForApproval": { "description": "Determines the conditions under which the user is consulted to approve running the command proposed by Codex.", "oneOf": [ @@ -7886,6 +7951,26 @@ "title": "Account/rateLimits/updatedNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "app/list/updated" + ], + "title": "App/list/updatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/AppListUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "App/list/updatedNotification", + "type": "object" + }, { "properties": { "method": { diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index a4f998df8..6f36a9a89 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -8192,6 +8192,26 @@ "title": "Account/rateLimits/updatedNotification", "type": "object" }, + { + "properties": { + "method": { + "enum": [ + "app/list/updated" + ], + "title": "App/list/updatedNotificationMethod", + "type": "string" + }, + "params": { + "$ref": "#/definitions/v2/AppListUpdatedNotification" + } + }, + "required": [ + "method", + "params" + ], + "title": "App/list/updatedNotification", + "type": "object" + }, { "properties": { "method": { @@ -9965,6 +9985,7 @@ "type": "string" }, "AppInfo": { + "description": "EXPERIMENTAL - app metadata returned by app-list APIs.", "properties": { "description": { "type": [ @@ -10013,11 +10034,29 @@ ], "type": "object" }, + "AppListUpdatedNotification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "description": "EXPERIMENTAL - notification emitted when the app list changes.", + "properties": { + "data": { + "items": { + "$ref": "#/definitions/v2/AppInfo" + }, + "type": "array" + } + }, + "required": [ + "data" + ], + "title": "AppListUpdatedNotification", + "type": "object" + }, "AppsConfig": { "type": "object" }, "AppsListParams": { "$schema": "http://json-schema.org/draft-07/schema#", + "description": "EXPERIMENTAL - list available apps/connectors.", "properties": { "cursor": { "description": "Opaque pagination cursor returned by a previous call.", @@ -10026,6 +10065,10 @@ "null" ] }, + "forceRefetch": { + "description": "When true, bypass app caches and fetch the latest data from sources.", + "type": "boolean" + }, "limit": { "description": "Optional page size; defaults to a reasonable server-side value.", "format": "uint32", @@ -10041,6 +10084,7 @@ }, "AppsListResponse": { "$schema": "http://json-schema.org/draft-07/schema#", + "description": "EXPERIMENTAL - app list response.", "properties": { "data": { "items": { diff --git a/codex-rs/app-server-protocol/schema/json/v2/AppListUpdatedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/AppListUpdatedNotification.json new file mode 100644 index 000000000..cf75f10ce --- /dev/null +++ b/codex-rs/app-server-protocol/schema/json/v2/AppListUpdatedNotification.json @@ -0,0 +1,69 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "AppInfo": { + "description": "EXPERIMENTAL - app metadata returned by app-list APIs.", + "properties": { + "description": { + "type": [ + "string", + "null" + ] + }, + "distributionChannel": { + "type": [ + "string", + "null" + ] + }, + "id": { + "type": "string" + }, + "installUrl": { + "type": [ + "string", + "null" + ] + }, + "isAccessible": { + "default": false, + "type": "boolean" + }, + "logoUrl": { + "type": [ + "string", + "null" + ] + }, + "logoUrlDark": { + "type": [ + "string", + "null" + ] + }, + "name": { + "type": "string" + } + }, + "required": [ + "id", + "name" + ], + "type": "object" + } + }, + "description": "EXPERIMENTAL - notification emitted when the app list changes.", + "properties": { + "data": { + "items": { + "$ref": "#/definitions/AppInfo" + }, + "type": "array" + } + }, + "required": [ + "data" + ], + "title": "AppListUpdatedNotification", + "type": "object" +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/AppsListParams.json b/codex-rs/app-server-protocol/schema/json/v2/AppsListParams.json index 3625f7b30..0369fec38 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/AppsListParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/AppsListParams.json @@ -1,5 +1,6 @@ { "$schema": "http://json-schema.org/draft-07/schema#", + "description": "EXPERIMENTAL - list available apps/connectors.", "properties": { "cursor": { "description": "Opaque pagination cursor returned by a previous call.", @@ -8,6 +9,10 @@ "null" ] }, + "forceRefetch": { + "description": "When true, bypass app caches and fetch the latest data from sources.", + "type": "boolean" + }, "limit": { "description": "Optional page size; defaults to a reasonable server-side value.", "format": "uint32", diff --git a/codex-rs/app-server-protocol/schema/json/v2/AppsListResponse.json b/codex-rs/app-server-protocol/schema/json/v2/AppsListResponse.json index f5cac7077..d2b5cc5ef 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/AppsListResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/AppsListResponse.json @@ -2,6 +2,7 @@ "$schema": "http://json-schema.org/draft-07/schema#", "definitions": { "AppInfo": { + "description": "EXPERIMENTAL - app metadata returned by app-list APIs.", "properties": { "description": { "type": [ @@ -51,6 +52,7 @@ "type": "object" } }, + "description": "EXPERIMENTAL - app list response.", "properties": { "data": { "items": { diff --git a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts index 403617fcd..2e1b1e963 100644 --- a/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts +++ b/codex-rs/app-server-protocol/schema/typescript/ServerNotification.ts @@ -8,6 +8,7 @@ import type { AccountLoginCompletedNotification } from "./v2/AccountLoginComplet import type { AccountRateLimitsUpdatedNotification } from "./v2/AccountRateLimitsUpdatedNotification"; import type { AccountUpdatedNotification } from "./v2/AccountUpdatedNotification"; import type { AgentMessageDeltaNotification } from "./v2/AgentMessageDeltaNotification"; +import type { AppListUpdatedNotification } from "./v2/AppListUpdatedNotification"; import type { CommandExecutionOutputDeltaNotification } from "./v2/CommandExecutionOutputDeltaNotification"; import type { ConfigWarningNotification } from "./v2/ConfigWarningNotification"; import type { ContextCompactedNotification } from "./v2/ContextCompactedNotification"; @@ -36,4 +37,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW /** * Notification sent from the server to the client. */ -export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; +export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/AppInfo.ts b/codex-rs/app-server-protocol/schema/typescript/v2/AppInfo.ts index 6e959cc2e..0957c0dd4 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/AppInfo.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/AppInfo.ts @@ -2,4 +2,7 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +/** + * EXPERIMENTAL - app metadata returned by app-list APIs. + */ export type AppInfo = { id: string, name: string, description: string | null, logoUrl: string | null, logoUrlDark: string | null, distributionChannel: string | null, installUrl: string | null, isAccessible: boolean, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/AppListUpdatedNotification.ts b/codex-rs/app-server-protocol/schema/typescript/v2/AppListUpdatedNotification.ts new file mode 100644 index 000000000..c6ad87f2c --- /dev/null +++ b/codex-rs/app-server-protocol/schema/typescript/v2/AppListUpdatedNotification.ts @@ -0,0 +1,9 @@ +// GENERATED CODE! DO NOT MODIFY BY HAND! + +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { AppInfo } from "./AppInfo"; + +/** + * EXPERIMENTAL - notification emitted when the app list changes. + */ +export type AppListUpdatedNotification = { data: Array, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/AppsListParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/AppsListParams.ts index a3e6fbf62..3bd769756 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/AppsListParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/AppsListParams.ts @@ -2,6 +2,9 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +/** + * EXPERIMENTAL - list available apps/connectors. + */ export type AppsListParams = { /** * Opaque pagination cursor returned by a previous call. @@ -10,4 +13,8 @@ cursor?: string | null, /** * Optional page size; defaults to a reasonable server-side value. */ -limit?: number | null, }; +limit?: number | null, +/** + * When true, bypass app caches and fetch the latest data from sources. + */ +forceRefetch?: boolean, }; diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/AppsListResponse.ts b/codex-rs/app-server-protocol/schema/typescript/v2/AppsListResponse.ts index b6f5c653f..cb1e45f20 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/AppsListResponse.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/AppsListResponse.ts @@ -3,6 +3,9 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. import type { AppInfo } from "./AppInfo"; +/** + * EXPERIMENTAL - app list response. + */ export type AppsListResponse = { data: Array, /** * Opaque cursor to pass to the next call to continue after the last item. diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts index 3c6caf034..05e8322e4 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/index.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/index.ts @@ -8,6 +8,7 @@ export type { AgentMessageDeltaNotification } from "./AgentMessageDeltaNotificat export type { AnalyticsConfig } from "./AnalyticsConfig"; export type { AppDisabledReason } from "./AppDisabledReason"; export type { AppInfo } from "./AppInfo"; +export type { AppListUpdatedNotification } from "./AppListUpdatedNotification"; export type { AppsConfig } from "./AppsConfig"; export type { AppsListParams } from "./AppsListParams"; export type { AppsListResponse } from "./AppsListResponse"; diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 9e0a5f8d7..dc9d97bf9 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -721,6 +721,7 @@ server_notification_definitions! { McpServerOauthLoginCompleted => "mcpServer/oauthLogin/completed" (v2::McpServerOauthLoginCompletedNotification), AccountUpdated => "account/updated" (v2::AccountUpdatedNotification), AccountRateLimitsUpdated => "account/rateLimits/updated" (v2::AccountRateLimitsUpdatedNotification), + AppListUpdated => "app/list/updated" (v2::AppListUpdatedNotification), ReasoningSummaryTextDelta => "item/reasoning/summaryTextDelta" (v2::ReasoningSummaryTextDeltaNotification), ReasoningSummaryPartAdded => "item/reasoning/summaryPartAdded" (v2::ReasoningSummaryPartAddedNotification), ReasoningTextDelta => "item/reasoning/textDelta" (v2::ReasoningTextDeltaNotification), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index fd0c33e8f..0d42f87f1 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1193,6 +1193,7 @@ pub struct ListMcpServerStatusResponse { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] +/// EXPERIMENTAL - list available apps/connectors. pub struct AppsListParams { /// Opaque pagination cursor returned by a previous call. #[ts(optional = nullable)] @@ -1200,11 +1201,15 @@ pub struct AppsListParams { /// Optional page size; defaults to a reasonable server-side value. #[ts(optional = nullable)] pub limit: Option, + /// When true, bypass app caches and fetch the latest data from sources. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub force_refetch: bool, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] +/// EXPERIMENTAL - app metadata returned by app-list APIs. pub struct AppInfo { pub id: String, pub name: String, @@ -1220,6 +1225,7 @@ pub struct AppInfo { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] +/// EXPERIMENTAL - app list response. pub struct AppsListResponse { pub data: Vec, /// Opaque cursor to pass to the next call to continue after the last item. @@ -1227,6 +1233,14 @@ pub struct AppsListResponse { pub next_cursor: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +/// EXPERIMENTAL - notification emitted when the app list changes. +pub struct AppListUpdatedNotification { + pub data: Vec, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 920761a17..368372f25 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -700,7 +700,8 @@ Use `app/list` to fetch available apps (connectors). Each entry includes metadat ```json { "method": "app/list", "id": 50, "params": { "cursor": null, - "limit": 50 + "limit": 50, + "forceRefetch": false } } { "id": 50, "result": { "data": [ @@ -719,6 +720,30 @@ Use `app/list` to fetch available apps (connectors). Each entry includes metadat } } ``` +`app/list` returns after both accessible apps and directory apps are loaded. Set `forceRefetch: true` to bypass app caches and fetch fresh data from sources. Cache entries are only replaced when those refetches succeed. + +The server also emits `app/list/updated` notifications whenever either source (accessible apps or directory apps) finishes loading. Each notification includes the latest merged app list. + +```json +{ + "method": "app/list/updated", + "params": { + "data": [ + { + "id": "demo-app", + "name": "Demo App", + "description": "Example connector for documentation.", + "logoUrl": "https://example.com/demo-app.png", + "logoUrlDark": null, + "distributionChannel": null, + "installUrl": "https://chatgpt.com/apps/demo-app/demo-app", + "isAccessible": true + } + ] + } +} +``` + Invoke an app by inserting `$` in the text input. The slug is derived from the app name and lowercased with non-alphanumeric characters replaced by `-` (for example, "Demo App" becomes `$demo-app`). Add a `mention` input item (recommended) so the server uses the exact `app://` path rather than guessing by name. Example: diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 3f65cf287..b9363b565 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -15,6 +15,8 @@ use codex_app_server_protocol::AccountLoginCompletedNotification; use codex_app_server_protocol::AccountUpdatedNotification; use codex_app_server_protocol::AddConversationListenerParams; use codex_app_server_protocol::AddConversationSubscriptionResponse; +use codex_app_server_protocol::AppInfo; +use codex_app_server_protocol::AppListUpdatedNotification; use codex_app_server_protocol::AppsListParams; use codex_app_server_protocol::AppsListResponse; use codex_app_server_protocol::ArchiveConversationParams; @@ -269,6 +271,7 @@ const THREAD_LIST_MAX_LIMIT: usize = 100; // Duration before a ChatGPT login attempt is abandoned. const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60); +const APP_LIST_LOAD_TIMEOUT: Duration = Duration::from_secs(90); struct ActiveLogin { shutdown_handle: ShutdownHandle, login_id: Uuid, @@ -279,6 +282,11 @@ enum CancelLoginError { NotFound(Uuid), } +enum AppListLoadResult { + Accessible(Result, String>), + Directory(Result, String>), +} + impl Drop for ActiveLogin { fn drop(&mut self) { self.shutdown_handle.shutdown(); @@ -4324,7 +4332,6 @@ impl CodexMessageProcessor { } async fn apps_list(&self, request_id: ConnectionRequestId, params: AppsListParams) { - let AppsListParams { cursor, limit } = params; let config = match self.load_latest_config().await { Ok(config) => config, Err(error) => { @@ -4346,65 +4353,182 @@ impl CodexMessageProcessor { return; } - let connectors = match connectors::list_connectors(&config).await { - Ok(connectors) => connectors, - Err(err) => { - self.send_internal_error(request_id, format!("failed to list apps: {err}")) - .await; - return; - } - }; + let request = request_id.clone(); + let outgoing = Arc::clone(&self.outgoing); + tokio::spawn(async move { + Self::apps_list_task(outgoing, request, params, config).await; + }); + } - let total = connectors.len(); - if total == 0 { - self.outgoing - .send_response( - request_id, - AppsListResponse { - data: Vec::new(), - next_cursor: None, - }, - ) - .await; - return; - } - - let effective_limit = limit.unwrap_or(total as u32).max(1) as usize; - let effective_limit = effective_limit.min(total); + async fn apps_list_task( + outgoing: Arc, + request_id: ConnectionRequestId, + params: AppsListParams, + config: Config, + ) { + let AppsListParams { + cursor, + limit, + force_refetch, + } = params; let start = match cursor { Some(cursor) => match cursor.parse::() { Ok(idx) => idx, Err(_) => { - self.send_invalid_request_error( - request_id, - format!("invalid cursor: {cursor}"), - ) - .await; + let error = JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("invalid cursor: {cursor}"), + data: None, + }; + outgoing.send_error(request_id, error).await; return; } }, None => 0, }; - if start > total { - self.send_invalid_request_error( - request_id, - format!("cursor {start} exceeds total apps {total}"), + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + + let accessible_config = config.clone(); + let accessible_tx = tx.clone(); + tokio::spawn(async move { + let result = connectors::list_accessible_connectors_from_mcp_tools_with_options( + &accessible_config, + force_refetch, ) - .await; - return; + .await + .map_err(|err| format!("failed to load accessible apps: {err}")); + let _ = accessible_tx.send(AppListLoadResult::Accessible(result)); + }); + + tokio::spawn(async move { + let result = connectors::list_all_connectors_with_options(&config, force_refetch) + .await + .map_err(|err| format!("failed to list apps: {err}")); + let _ = tx.send(AppListLoadResult::Directory(result)); + }); + + let mut accessible_connectors: Option> = None; + let mut all_connectors: Option> = None; + let app_list_deadline = tokio::time::Instant::now() + APP_LIST_LOAD_TIMEOUT; + + loop { + let result = match tokio::time::timeout_at(app_list_deadline, rx.recv()).await { + Ok(Some(result)) => result, + Ok(None) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: "failed to load app lists".to_string(), + data: None, + }; + outgoing.send_error(request_id, error).await; + return; + } + Err(_) => { + let timeout_seconds = APP_LIST_LOAD_TIMEOUT.as_secs(); + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: format!( + "timed out waiting for app lists after {timeout_seconds} seconds" + ), + data: None, + }; + outgoing.send_error(request_id, error).await; + return; + } + }; + + match result { + AppListLoadResult::Accessible(Ok(connectors)) => { + accessible_connectors = Some(connectors); + } + AppListLoadResult::Accessible(Err(err)) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: err, + data: None, + }; + outgoing.send_error(request_id, error).await; + return; + } + AppListLoadResult::Directory(Ok(connectors)) => { + all_connectors = Some(connectors); + } + AppListLoadResult::Directory(Err(err)) => { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message: err, + data: None, + }; + outgoing.send_error(request_id, error).await; + return; + } + } + + let merged = Self::merge_loaded_apps( + all_connectors.as_deref(), + accessible_connectors.as_deref(), + ); + Self::send_app_list_updated_notification(&outgoing, merged.clone()).await; + + if accessible_connectors.is_some() && all_connectors.is_some() { + match Self::paginate_apps(merged.as_slice(), start, limit) { + Ok(response) => { + outgoing.send_response(request_id, response).await; + return; + } + Err(error) => { + outgoing.send_error(request_id, error).await; + return; + } + } + } + } + } + + fn merge_loaded_apps( + all_connectors: Option<&[AppInfo]>, + accessible_connectors: Option<&[AppInfo]>, + ) -> Vec { + let all = all_connectors.map_or_else(Vec::new, <[AppInfo]>::to_vec); + let accessible = accessible_connectors.map_or_else(Vec::new, <[AppInfo]>::to_vec); + connectors::merge_connectors_with_accessible(all, accessible) + } + + fn paginate_apps( + connectors: &[AppInfo], + start: usize, + limit: Option, + ) -> Result { + let total = connectors.len(); + if start > total { + return Err(JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!("cursor {start} exceeds total apps {total}"), + data: None, + }); } + let effective_limit = limit.unwrap_or(total as u32).max(1) as usize; let end = start.saturating_add(effective_limit).min(total); let data = connectors[start..end].to_vec(); - let next_cursor = if end < total { Some(end.to_string()) } else { None }; - self.outgoing - .send_response(request_id, AppsListResponse { data, next_cursor }) + + Ok(AppsListResponse { data, next_cursor }) + } + + async fn send_app_list_updated_notification( + outgoing: &Arc, + data: Vec, + ) { + outgoing + .send_server_notification(ServerNotification::AppListUpdated( + AppListUpdatedNotification { data }, + )) .await; } diff --git a/codex-rs/app-server/tests/suite/v2/app_list.rs b/codex-rs/app-server/tests/suite/v2/app_list.rs index c6b829f2e..0a5d90af2 100644 --- a/codex-rs/app-server/tests/suite/v2/app_list.rs +++ b/codex-rs/app-server/tests/suite/v2/app_list.rs @@ -3,6 +3,7 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; +use anyhow::bail; use app_test_support::ChatGptAuthFixture; use app_test_support::McpProcess; use app_test_support::to_response; @@ -15,10 +16,13 @@ use axum::http::StatusCode; use axum::http::header::AUTHORIZATION; use axum::routing::get; use codex_app_server_protocol::AppInfo; +use codex_app_server_protocol::AppListUpdatedNotification; use codex_app_server_protocol::AppsListParams; use codex_app_server_protocol::AppsListResponse; +use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; +use codex_app_server_protocol::ServerNotification; use codex_core::auth::AuthCredentialsStoreMode; use pretty_assertions::assert_eq; use rmcp::handler::server::ServerHandler; @@ -51,6 +55,7 @@ async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> { .send_apps_list_request(AppsListParams { limit: Some(50), cursor: None, + force_refetch: false, }) .await?; @@ -67,6 +72,120 @@ async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> { Ok(()) } +#[tokio::test] +async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<()> { + let connectors = vec![ + AppInfo { + id: "alpha".to_string(), + name: "Alpha".to_string(), + description: Some("Alpha connector".to_string()), + logo_url: Some("https://example.com/alpha.png".to_string()), + logo_url_dark: None, + distribution_channel: None, + install_url: None, + is_accessible: false, + }, + AppInfo { + id: "beta".to_string(), + name: "beta".to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: None, + is_accessible: false, + }, + ]; + + let tools = vec![connector_tool("beta", "Beta App")?]; + let (server_url, server_handle) = start_apps_server_with_delays( + connectors.clone(), + tools, + Duration::from_millis(300), + Duration::ZERO, + ) + .await?; + + let codex_home = TempDir::new()?; + write_connectors_config(codex_home.path(), &server_url)?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_apps_list_request(AppsListParams { + limit: None, + cursor: None, + force_refetch: false, + }) + .await?; + + let expected_accessible = vec![AppInfo { + id: "beta".to_string(), + name: "Beta App".to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://chatgpt.com/apps/beta-app/beta".to_string()), + is_accessible: true, + }]; + + let first_update = read_app_list_updated_notification(&mut mcp).await?; + assert_eq!(first_update.data, expected_accessible); + + let expected_merged = vec![ + AppInfo { + id: "beta".to_string(), + name: "Beta App".to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()), + is_accessible: true, + }, + AppInfo { + id: "alpha".to_string(), + name: "Alpha".to_string(), + description: Some("Alpha connector".to_string()), + logo_url: Some("https://example.com/alpha.png".to_string()), + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()), + is_accessible: false, + }, + ]; + + let second_update = read_app_list_updated_notification(&mut mcp).await?; + assert_eq!(second_update.data, expected_merged); + + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + + let AppsListResponse { + data: response_data, + next_cursor, + } = to_response(response)?; + assert_eq!(response_data, expected_merged); + assert!(next_cursor.is_none()); + + server_handle.abort(); + let _ = server_handle.await; + Ok(()) +} + #[tokio::test] async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { let connectors = vec![ @@ -93,7 +212,13 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { ]; let tools = vec![connector_tool("beta", "Beta App")?]; - let (server_url, server_handle) = start_apps_server(connectors.clone(), tools).await?; + let (server_url, server_handle) = start_apps_server_with_delays( + connectors.clone(), + tools, + Duration::ZERO, + Duration::from_millis(300), + ) + .await?; let codex_home = TempDir::new()?; write_connectors_config(codex_home.path(), &server_url)?; @@ -113,16 +238,36 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { .send_apps_list_request(AppsListParams { limit: None, cursor: None, + force_refetch: false, }) .await?; - let response: JSONRPCResponse = timeout( - DEFAULT_TIMEOUT, - mcp.read_stream_until_response_message(RequestId::Integer(request_id)), - ) - .await??; - - let AppsListResponse { data, next_cursor } = to_response(response)?; + let first_update = read_app_list_updated_notification(&mut mcp).await?; + assert_eq!( + first_update.data, + vec![ + AppInfo { + id: "alpha".to_string(), + name: "Alpha".to_string(), + description: Some("Alpha connector".to_string()), + logo_url: Some("https://example.com/alpha.png".to_string()), + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()), + is_accessible: false, + }, + AppInfo { + id: "beta".to_string(), + name: "beta".to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()), + is_accessible: false, + }, + ] + ); let expected = vec![ AppInfo { @@ -147,6 +292,15 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { }, ]; + let second_update = read_app_list_updated_notification(&mut mcp).await?; + assert_eq!(second_update.data, expected); + + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let AppsListResponse { data, next_cursor } = to_response(response)?; assert_eq!(data, expected); assert!(next_cursor.is_none()); @@ -180,7 +334,13 @@ async fn list_apps_paginates_results() -> Result<()> { ]; let tools = vec![connector_tool("beta", "Beta App")?]; - let (server_url, server_handle) = start_apps_server(connectors.clone(), tools).await?; + let (server_url, server_handle) = start_apps_server_with_delays( + connectors.clone(), + tools, + Duration::ZERO, + Duration::from_millis(300), + ) + .await?; let codex_home = TempDir::new()?; write_connectors_config(codex_home.path(), &server_url)?; @@ -200,6 +360,7 @@ async fn list_apps_paginates_results() -> Result<()> { .send_apps_list_request(AppsListParams { limit: Some(1), cursor: None, + force_refetch: false, }) .await?; let first_response: JSONRPCResponse = timeout( @@ -226,10 +387,18 @@ async fn list_apps_paginates_results() -> Result<()> { assert_eq!(first_page, expected_first); let next_cursor = first_cursor.ok_or_else(|| anyhow::anyhow!("missing cursor"))?; + loop { + let update = read_app_list_updated_notification(&mut mcp).await?; + if update.data.len() == 2 && update.data.iter().any(|connector| connector.is_accessible) { + break; + } + } + let second_request = mcp .send_apps_list_request(AppsListParams { limit: Some(1), cursor: Some(next_cursor), + force_refetch: false, }) .await?; let second_response: JSONRPCResponse = timeout( @@ -260,21 +429,134 @@ async fn list_apps_paginates_results() -> Result<()> { Ok(()) } +#[tokio::test] +async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result<()> { + let connectors = vec![AppInfo { + id: "beta".to_string(), + name: "Beta App".to_string(), + description: Some("Beta connector".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: None, + is_accessible: false, + }]; + let tools = vec![connector_tool("beta", "Beta App")?]; + let (server_url, server_handle) = + start_apps_server_with_delays(connectors, tools, Duration::ZERO, Duration::ZERO).await?; + + let codex_home = TempDir::new()?; + write_connectors_config(codex_home.path(), &server_url)?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let initial_request = mcp + .send_apps_list_request(AppsListParams { + limit: None, + cursor: None, + force_refetch: false, + }) + .await?; + let initial_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(initial_request)), + ) + .await??; + let AppsListResponse { + data: initial_data, + next_cursor: initial_next_cursor, + } = to_response(initial_response)?; + assert!(initial_next_cursor.is_none()); + assert_eq!(initial_data.len(), 1); + assert!(initial_data.iter().all(|app| app.is_accessible)); + + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token-invalid") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let refetch_request = mcp + .send_apps_list_request(AppsListParams { + limit: None, + cursor: None, + force_refetch: true, + }) + .await?; + let refetch_error: JSONRPCError = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(refetch_request)), + ) + .await??; + assert!(refetch_error.error.message.contains("failed to")); + + let cached_request = mcp + .send_apps_list_request(AppsListParams { + limit: None, + cursor: None, + force_refetch: false, + }) + .await?; + let cached_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(cached_request)), + ) + .await??; + let AppsListResponse { + data: cached_data, + next_cursor: cached_next_cursor, + } = to_response(cached_response)?; + + assert_eq!(cached_data, initial_data); + assert!(cached_next_cursor.is_none()); + server_handle.abort(); + Ok(()) +} + +async fn read_app_list_updated_notification( + mcp: &mut McpProcess, +) -> Result { + let notification = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_notification_message("app/list/updated"), + ) + .await??; + let parsed: ServerNotification = notification.try_into()?; + let ServerNotification::AppListUpdated(payload) = parsed else { + bail!("unexpected notification variant"); + }; + Ok(payload) +} + #[derive(Clone)] struct AppsServerState { expected_bearer: String, expected_account_id: String, response: serde_json::Value, + directory_delay: Duration, } #[derive(Clone)] struct AppListMcpServer { tools: Arc>, + tools_delay: Duration, } impl AppListMcpServer { - fn new(tools: Arc>) -> Self { - Self { tools } + fn new(tools: Arc>, tools_delay: Duration) -> Self { + Self { tools, tools_delay } } } @@ -293,7 +575,11 @@ impl ServerHandler for AppListMcpServer { ) -> impl std::future::Future> + Send + '_ { let tools = self.tools.clone(); + let tools_delay = self.tools_delay; async move { + if tools_delay > Duration::ZERO { + tokio::time::sleep(tools_delay).await; + } Ok(ListToolsResult { tools: (*tools).clone(), next_cursor: None, @@ -303,14 +589,17 @@ impl ServerHandler for AppListMcpServer { } } -async fn start_apps_server( +async fn start_apps_server_with_delays( connectors: Vec, tools: Vec, + directory_delay: Duration, + tools_delay: Duration, ) -> Result<(String, JoinHandle<()>)> { let state = AppsServerState { expected_bearer: "Bearer chatgpt-token".to_string(), expected_account_id: "account-123".to_string(), response: json!({ "apps": connectors, "next_token": null }), + directory_delay, }; let state = Arc::new(state); let tools = Arc::new(tools); @@ -321,7 +610,7 @@ async fn start_apps_server( let mcp_service = StreamableHttpService::new( { let tools = tools.clone(); - move || Ok(AppListMcpServer::new(tools.clone())) + move || Ok(AppListMcpServer::new(tools.clone(), tools_delay)) }, Arc::new(LocalSessionManager::default()), StreamableHttpServerConfig::default(), @@ -347,6 +636,10 @@ async fn list_directory_connectors( State(state): State>, headers: HeaderMap, ) -> Result { + if state.directory_delay > Duration::ZERO { + tokio::time::sleep(state.directory_delay).await; + } + let bearer_ok = headers .get(AUTHORIZATION) .and_then(|value| value.to_str().ok()) diff --git a/codex-rs/chatgpt/src/connectors.rs b/codex-rs/chatgpt/src/connectors.rs index 83b727158..396286efd 100644 --- a/codex-rs/chatgpt/src/connectors.rs +++ b/codex-rs/chatgpt/src/connectors.rs @@ -1,18 +1,24 @@ use std::collections::HashMap; +use std::sync::LazyLock; +use std::sync::Mutex as StdMutex; use codex_core::config::Config; use codex_core::features::Feature; +use codex_core::token_data::TokenData; use serde::Deserialize; use std::time::Duration; +use std::time::Instant; use crate::chatgpt_client::chatgpt_get_request_with_timeout; use crate::chatgpt_token::get_chatgpt_token_data; use crate::chatgpt_token::init_chatgpt_token_from_auth; pub use codex_core::connectors::AppInfo; +use codex_core::connectors::CONNECTORS_CACHE_TTL; pub use codex_core::connectors::connector_display_label; use codex_core::connectors::connector_install_url; pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools; +pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools_with_options; use codex_core::connectors::merge_connectors; #[derive(Debug, Deserialize)] @@ -38,6 +44,24 @@ struct DirectoryApp { const DIRECTORY_CONNECTORS_TIMEOUT: Duration = Duration::from_secs(60); +#[derive(Clone, PartialEq, Eq)] +struct AllConnectorsCacheKey { + chatgpt_base_url: String, + account_id: Option, + chatgpt_user_id: Option, + is_workspace_account: bool, +} + +#[derive(Clone)] +struct CachedAllConnectors { + key: AllConnectorsCacheKey, + expires_at: Instant, + connectors: Vec, +} + +static ALL_CONNECTORS_CACHE: LazyLock>> = + LazyLock::new(|| StdMutex::new(None)); + pub async fn list_connectors(config: &Config) -> anyhow::Result> { if !config.features.enabled(Feature::Apps) { return Ok(Vec::new()); @@ -48,11 +72,17 @@ pub async fn list_connectors(config: &Config) -> anyhow::Result> { ); let connectors = connectors_result?; let accessible = accessible_result?; - let merged = merge_connectors(connectors, accessible); - Ok(filter_disallowed_connectors(merged)) + Ok(merge_connectors_with_accessible(connectors, accessible)) } pub async fn list_all_connectors(config: &Config) -> anyhow::Result> { + list_all_connectors_with_options(config, false).await +} + +pub async fn list_all_connectors_with_options( + config: &Config, + force_refetch: bool, +) -> anyhow::Result> { if !config.features.enabled(Feature::Apps) { return Ok(Vec::new()); } @@ -61,6 +91,11 @@ pub async fn list_all_connectors(config: &Config) -> anyhow::Result let token_data = get_chatgpt_token_data().ok_or_else(|| anyhow::anyhow!("ChatGPT token not available"))?; + let cache_key = all_connectors_cache_key(config, &token_data); + if !force_refetch && let Some(cached_connectors) = read_cached_all_connectors(&cache_key) { + return Ok(cached_connectors); + } + let mut apps = list_directory_connectors(config).await?; if token_data.id_token.is_workspace_account() { apps.extend(list_workspace_connectors(config).await?); @@ -84,9 +119,56 @@ pub async fn list_all_connectors(config: &Config) -> anyhow::Result .cmp(&right.name) .then_with(|| left.id.cmp(&right.id)) }); + write_cached_all_connectors(cache_key, &connectors); Ok(connectors) } +fn all_connectors_cache_key(config: &Config, token_data: &TokenData) -> AllConnectorsCacheKey { + AllConnectorsCacheKey { + chatgpt_base_url: config.chatgpt_base_url.clone(), + account_id: token_data.account_id.clone(), + chatgpt_user_id: token_data.id_token.chatgpt_user_id.clone(), + is_workspace_account: token_data.id_token.is_workspace_account(), + } +} + +fn read_cached_all_connectors(cache_key: &AllConnectorsCacheKey) -> Option> { + let mut cache_guard = ALL_CONNECTORS_CACHE + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let now = Instant::now(); + + if let Some(cached) = cache_guard.as_ref() { + if now < cached.expires_at && cached.key == *cache_key { + return Some(cached.connectors.clone()); + } + if now >= cached.expires_at { + *cache_guard = None; + } + } + + None +} + +fn write_cached_all_connectors(cache_key: AllConnectorsCacheKey, connectors: &[AppInfo]) { + let mut cache_guard = ALL_CONNECTORS_CACHE + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *cache_guard = Some(CachedAllConnectors { + key: cache_key, + expires_at: Instant::now() + CONNECTORS_CACHE_TTL, + connectors: connectors.to_vec(), + }); +} + +pub fn merge_connectors_with_accessible( + connectors: Vec, + accessible_connectors: Vec, +) -> Vec { + let merged = merge_connectors(connectors, accessible_connectors); + filter_disallowed_connectors(merged) +} + async fn list_directory_connectors(config: &Config) -> anyhow::Result> { let mut apps = Vec::new(); let mut next_token: Option = None; diff --git a/codex-rs/core/src/connectors.rs b/codex-rs/core/src/connectors.rs index afe03fbbe..137b2c005 100644 --- a/codex-rs/core/src/connectors.rs +++ b/codex-rs/core/src/connectors.rs @@ -1,6 +1,10 @@ use std::collections::HashMap; use std::env; use std::path::PathBuf; +use std::sync::LazyLock; +use std::sync::Mutex as StdMutex; +use std::time::Duration; +use std::time::Instant; use async_channel::unbounded; pub use codex_app_server_protocol::AppInfo; @@ -8,6 +12,7 @@ use codex_protocol::protocol::SandboxPolicy; use tokio_util::sync::CancellationToken; use crate::AuthManager; +use crate::CodexAuth; use crate::SandboxState; use crate::config::Config; use crate::features::Feature; @@ -16,9 +21,37 @@ use crate::mcp::auth::compute_auth_statuses; use crate::mcp::with_codex_apps_mcp; use crate::mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT; use crate::mcp_connection_manager::McpConnectionManager; +use crate::token_data::TokenData; + +pub const CONNECTORS_CACHE_TTL: Duration = Duration::from_secs(3600); + +#[derive(Clone, PartialEq, Eq)] +struct AccessibleConnectorsCacheKey { + chatgpt_base_url: String, + account_id: Option, + chatgpt_user_id: Option, + is_workspace_account: bool, +} + +#[derive(Clone)] +struct CachedAccessibleConnectors { + key: AccessibleConnectorsCacheKey, + expires_at: Instant, + connectors: Vec, +} + +static ACCESSIBLE_CONNECTORS_CACHE: LazyLock>> = + LazyLock::new(|| StdMutex::new(None)); pub async fn list_accessible_connectors_from_mcp_tools( config: &Config, +) -> anyhow::Result> { + list_accessible_connectors_from_mcp_tools_with_options(config, false).await +} + +pub async fn list_accessible_connectors_from_mcp_tools_with_options( + config: &Config, + force_refetch: bool, ) -> anyhow::Result> { if !config.features.enabled(Feature::Apps) { return Ok(Vec::new()); @@ -26,6 +59,12 @@ pub async fn list_accessible_connectors_from_mcp_tools( let auth_manager = auth_manager_from_config(config); let auth = auth_manager.auth().await; + let cache_key = accessible_connectors_cache_key(config, auth.as_ref()); + if !force_refetch && let Some(cached_connectors) = read_cached_accessible_connectors(&cache_key) + { + return Ok(cached_connectors); + } + let mcp_servers = with_codex_apps_mcp(HashMap::new(), true, auth.as_ref(), config); if mcp_servers.is_empty() { return Ok(Vec::new()); @@ -57,17 +96,79 @@ pub async fn list_accessible_connectors_from_mcp_tools( ) .await; - if let Some(cfg) = mcp_servers.get(CODEX_APPS_MCP_SERVER_NAME) { + let codex_apps_ready = if let Some(cfg) = mcp_servers.get(CODEX_APPS_MCP_SERVER_NAME) { let timeout = cfg.startup_timeout_sec.unwrap_or(DEFAULT_STARTUP_TIMEOUT); mcp_connection_manager .wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, timeout) - .await; - } + .await + } else { + false + }; let tools = mcp_connection_manager.list_all_tools().await; cancel_token.cancel(); - Ok(accessible_connectors_from_mcp_tools(&tools)) + let accessible_connectors = accessible_connectors_from_mcp_tools(&tools); + if codex_apps_ready || !accessible_connectors.is_empty() { + write_cached_accessible_connectors(cache_key, &accessible_connectors); + } + Ok(accessible_connectors) +} + +fn accessible_connectors_cache_key( + config: &Config, + auth: Option<&CodexAuth>, +) -> AccessibleConnectorsCacheKey { + let token_data: Option = auth.and_then(|auth| auth.get_token_data().ok()); + let account_id = token_data + .as_ref() + .and_then(|token_data| token_data.account_id.clone()); + let chatgpt_user_id = token_data + .as_ref() + .and_then(|token_data| token_data.id_token.chatgpt_user_id.clone()); + let is_workspace_account = token_data + .as_ref() + .is_some_and(|token_data| token_data.id_token.is_workspace_account()); + AccessibleConnectorsCacheKey { + chatgpt_base_url: config.chatgpt_base_url.clone(), + account_id, + chatgpt_user_id, + is_workspace_account, + } +} + +fn read_cached_accessible_connectors( + cache_key: &AccessibleConnectorsCacheKey, +) -> Option> { + let mut cache_guard = ACCESSIBLE_CONNECTORS_CACHE + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let now = Instant::now(); + + if let Some(cached) = cache_guard.as_ref() { + if now < cached.expires_at && cached.key == *cache_key { + return Some(cached.connectors.clone()); + } + if now >= cached.expires_at { + *cache_guard = None; + } + } + + None +} + +fn write_cached_accessible_connectors( + cache_key: AccessibleConnectorsCacheKey, + connectors: &[AppInfo], +) { + let mut cache_guard = ACCESSIBLE_CONNECTORS_CACHE + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + *cache_guard = Some(CachedAccessibleConnectors { + key: cache_key, + expires_at: Instant::now() + CONNECTORS_CACHE_TTL, + connectors: connectors.to_vec(), + }); } fn auth_manager_from_config(config: &Config) -> std::sync::Arc { diff --git a/codex-rs/tui/src/app.rs b/codex-rs/tui/src/app.rs index 4d40c0175..2162e0ff1 100644 --- a/codex-rs/tui/src/app.rs +++ b/codex-rs/tui/src/app.rs @@ -1603,8 +1603,8 @@ impl App { AppEvent::RateLimitSnapshotFetched(snapshot) => { self.chat_widget.on_rate_limit_snapshot(Some(snapshot)); } - AppEvent::ConnectorsLoaded(result) => { - self.chat_widget.on_connectors_loaded(result); + AppEvent::ConnectorsLoaded { result, is_final } => { + self.chat_widget.on_connectors_loaded(result, is_final); } AppEvent::UpdateReasoningEffort(effort) => { self.on_update_reasoning_effort(effort); diff --git a/codex-rs/tui/src/app_event.rs b/codex-rs/tui/src/app_event.rs index bd48e5de1..b48960bbf 100644 --- a/codex-rs/tui/src/app_event.rs +++ b/codex-rs/tui/src/app_event.rs @@ -97,7 +97,10 @@ pub(crate) enum AppEvent { RateLimitSnapshotFetched(RateLimitSnapshot), /// Result of prefetching connectors. - ConnectorsLoaded(Result), + ConnectorsLoaded { + result: Result, + is_final: bool, + }, /// Result of computing a `/diff` command. DiffResult(String), diff --git a/codex-rs/tui/src/bottom_pane/bottom_pane_view.rs b/codex-rs/tui/src/bottom_pane/bottom_pane_view.rs index 039a9ca05..9c40adc69 100644 --- a/codex-rs/tui/src/bottom_pane/bottom_pane_view.rs +++ b/codex-rs/tui/src/bottom_pane/bottom_pane_view.rs @@ -16,6 +16,11 @@ pub(crate) trait BottomPaneView: Renderable { false } + /// Stable identifier for views that need external refreshes while open. + fn view_id(&self) -> Option<&'static str> { + None + } + /// Handle Ctrl-C while this view is active. fn on_ctrl_c(&mut self) -> CancellationEvent { CancellationEvent::NotHandled diff --git a/codex-rs/tui/src/bottom_pane/chat_composer.rs b/codex-rs/tui/src/bottom_pane/chat_composer.rs index 60fea9b08..58b6b5611 100644 --- a/codex-rs/tui/src/bottom_pane/chat_composer.rs +++ b/codex-rs/tui/src/bottom_pane/chat_composer.rs @@ -421,6 +421,7 @@ impl ChatComposer { pub fn set_connector_mentions(&mut self, connectors_snapshot: Option) { self.connectors_snapshot = connectors_snapshot; + self.sync_popups(); } pub(crate) fn take_mention_bindings(&mut self) -> Vec { @@ -4269,6 +4270,43 @@ mod tests { assert_ne!(composer.footer_mode, FooterMode::ShortcutOverlay); } + #[test] + fn set_connector_mentions_refreshes_open_mention_popup() { + let (tx, _rx) = unbounded_channel::(); + let sender = AppEventSender::new(tx); + let mut composer = ChatComposer::new( + true, + sender, + false, + "Ask Codex to do anything".to_string(), + false, + ); + composer.set_connectors_enabled(true); + composer.set_text_content("$".to_string(), Vec::new(), Vec::new()); + assert!(matches!(composer.active_popup, ActivePopup::None)); + + let connectors = vec![AppInfo { + id: "connector_1".to_string(), + name: "Notion".to_string(), + description: Some("Workspace docs".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://example.test/notion".to_string()), + is_accessible: true, + }]; + composer.set_connector_mentions(Some(ConnectorsSnapshot { connectors })); + + let ActivePopup::Skill(popup) = &composer.active_popup else { + panic!("expected mention popup to open after connectors update"); + }; + let mention = popup + .selected_mention() + .expect("expected connector mention to be selected"); + assert_eq!(mention.insert_text, "$notion".to_string()); + assert_eq!(mention.path, Some("app://connector_1".to_string())); + } + #[test] fn shortcut_overlay_persists_while_task_running() { use crossterm::event::KeyCode; diff --git a/codex-rs/tui/src/bottom_pane/list_selection_view.rs b/codex-rs/tui/src/bottom_pane/list_selection_view.rs index 3cf74e3a1..8f20f2bc6 100644 --- a/codex-rs/tui/src/bottom_pane/list_selection_view.rs +++ b/codex-rs/tui/src/bottom_pane/list_selection_view.rs @@ -68,6 +68,7 @@ pub(crate) struct SelectionItem { /// `AutoAllRows` measures all rows to ensure stable column widths as the user scrolls /// `Fixed` used a fixed 30/70 split between columns pub(crate) struct SelectionViewParams { + pub view_id: Option<&'static str>, pub title: Option, pub subtitle: Option, pub footer_note: Option>, @@ -83,6 +84,7 @@ pub(crate) struct SelectionViewParams { impl Default for SelectionViewParams { fn default() -> Self { Self { + view_id: None, title: None, subtitle: None, footer_note: None, @@ -103,6 +105,7 @@ impl Default for SelectionViewParams { /// visible rows and source items and for preserving selection while filters /// change. pub(crate) struct ListSelectionView { + view_id: Option<&'static str>, footer_note: Option>, footer_hint: Option>, items: Vec, @@ -139,6 +142,7 @@ impl ListSelectionView { ])); } let mut s = Self { + view_id: params.view_id, footer_note: params.footer_note, footer_hint: params.footer_hint, items: params.items, @@ -460,6 +464,10 @@ impl BottomPaneView for ListSelectionView { self.complete } + fn view_id(&self) -> Option<&'static str> { + self.view_id + } + fn on_ctrl_c(&mut self) -> CancellationEvent { self.complete = true; CancellationEvent::Handled diff --git a/codex-rs/tui/src/bottom_pane/mod.rs b/codex-rs/tui/src/bottom_pane/mod.rs index 960e67fed..226e91abb 100644 --- a/codex-rs/tui/src/bottom_pane/mod.rs +++ b/codex-rs/tui/src/bottom_pane/mod.rs @@ -658,6 +658,26 @@ impl BottomPane { self.push_view(Box::new(view)); } + /// Replace the active selection view when it matches `view_id`. + pub(crate) fn replace_selection_view_if_active( + &mut self, + view_id: &'static str, + params: list_selection_view::SelectionViewParams, + ) -> bool { + let is_match = self + .view_stack + .last() + .is_some_and(|view| view.view_id() == Some(view_id)); + if !is_match { + return false; + } + + self.view_stack.pop(); + let view = list_selection_view::ListSelectionView::new(params, self.app_event_tx.clone()); + self.push_view(Box::new(view)); + true + } + /// Update the queued messages preview shown above the composer. pub(crate) fn set_queued_user_messages(&mut self, queued: Vec) { self.queued_user_messages.messages = queued; diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 9746a0974..0f98e0f49 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -145,12 +145,14 @@ use ratatui::widgets::Wrap; use tokio::sync::mpsc::UnboundedSender; use tokio::task::JoinHandle; use tracing::debug; +use tracing::warn; const DEFAULT_MODEL_DISPLAY_NAME: &str = "loading"; const PLAN_IMPLEMENTATION_TITLE: &str = "Implement this plan?"; const PLAN_IMPLEMENTATION_YES: &str = "Yes, implement this plan"; const PLAN_IMPLEMENTATION_NO: &str = "No, stay in Plan mode"; const PLAN_IMPLEMENTATION_CODING_MESSAGE: &str = "Implement the plan."; +const CONNECTORS_SELECTION_VIEW_ID: &str = "connectors-selection"; use crate::app_event::AppEvent; use crate::app_event::ConnectorsSnapshot; @@ -539,6 +541,7 @@ pub(crate) struct ChatWidget { /// currently executing. mcp_startup_status: Option>, connectors_cache: ConnectorsCacheState, + connectors_prefetch_in_flight: bool, // Queue of interruptive UI events deferred during an active write cycle interrupts: InterruptManager, // Accumulates the current reasoning block text to extract a header @@ -1013,7 +1016,6 @@ impl ChatWidget { self.bottom_pane .set_history_metadata(event.history_log_id, event.history_entry_count); self.set_skills(None); - self.bottom_pane.set_connectors_snapshot(None); self.thread_id = Some(event.session_id); self.thread_name = event.thread_name.clone(); self.forked_from = event.forked_from_id; @@ -2619,6 +2621,7 @@ impl ChatWidget { agent_turn_running: false, mcp_startup_status: None, connectors_cache: ConnectorsCacheState::default(), + connectors_prefetch_in_flight: false, interrupts: InterruptManager::new(), reasoning_buffer: String::new(), full_reasoning_buffer: String::new(), @@ -2782,6 +2785,7 @@ impl ChatWidget { agent_turn_running: false, mcp_startup_status: None, connectors_cache: ConnectorsCacheState::default(), + connectors_prefetch_in_flight: false, interrupts: InterruptManager::new(), reasoning_buffer: String::new(), full_reasoning_buffer: String::new(), @@ -2934,6 +2938,7 @@ impl ChatWidget { agent_turn_running: false, mcp_startup_status: None, connectors_cache: ConnectorsCacheState::default(), + connectors_prefetch_in_flight: false, interrupts: InterruptManager::new(), reasoning_buffer: String::new(), full_reasoning_buffer: String::new(), @@ -4470,24 +4475,52 @@ impl ChatWidget { } fn prefetch_connectors(&mut self) { - if !self.connectors_enabled() { - return; - } - if matches!(self.connectors_cache, ConnectorsCacheState::Loading) { + if !self.connectors_enabled() || self.connectors_prefetch_in_flight { return; } - self.connectors_cache = ConnectorsCacheState::Loading; + self.connectors_prefetch_in_flight = true; + if !matches!(self.connectors_cache, ConnectorsCacheState::Ready(_)) { + self.connectors_cache = ConnectorsCacheState::Loading; + } + let config = self.config.clone(); let app_event_tx = self.app_event_tx.clone(); tokio::spawn(async move { - let result: Result = async { - let connectors = connectors::list_connectors(&config).await?; + let accessible_connectors = + match connectors::list_accessible_connectors_from_mcp_tools(&config).await { + Ok(connectors) => connectors, + Err(err) => { + app_event_tx.send(AppEvent::ConnectorsLoaded { + result: Err(format!("Failed to load apps: {err}")), + is_final: true, + }); + return; + } + }; + + app_event_tx.send(AppEvent::ConnectorsLoaded { + result: Ok(ConnectorsSnapshot { + connectors: accessible_connectors.clone(), + }), + is_final: false, + }); + + let result: Result = async { + let all_connectors = connectors::list_all_connectors(&config).await?; + let connectors = connectors::merge_connectors_with_accessible( + all_connectors, + accessible_connectors, + ); Ok(ConnectorsSnapshot { connectors }) } - .await; - let result = result.map_err(|err| format!("Failed to load apps: {err}")); - app_event_tx.send(AppEvent::ConnectorsLoaded(result)); + .await + .map_err(|err: anyhow::Error| format!("Failed to load apps: {err}")); + + app_event_tx.send(AppEvent::ConnectorsLoaded { + result, + is_final: true, + }); }); } @@ -6345,6 +6378,11 @@ impl ChatWidget { } fn open_connectors_popup(&mut self, connectors: &[connectors::AppInfo]) { + self.bottom_pane + .show_selection_view(self.connectors_popup_params(connectors)); + } + + fn connectors_popup_params(&self, connectors: &[connectors::AppInfo]) -> SelectionViewParams { let total = connectors.len(); let installed = connectors .iter() @@ -6412,7 +6450,8 @@ impl ChatWidget { items.push(item); } - self.bottom_pane.show_selection_view(SelectionViewParams { + SelectionViewParams { + view_id: Some(CONNECTORS_SELECTION_VIEW_ID), header: Box::new(header), footer_hint: Some(Self::connectors_popup_hint_line()), items, @@ -6420,7 +6459,14 @@ impl ChatWidget { search_placeholder: Some("Type to search apps".to_string()), col_width_mode: ColumnWidthMode::AutoAllRows, ..Default::default() - }); + } + } + + fn refresh_connectors_popup_if_open(&mut self, connectors: &[connectors::AppInfo]) { + let _ = self.bottom_pane.replace_selection_view_if_active( + CONNECTORS_SELECTION_VIEW_ID, + self.connectors_popup_params(connectors), + ); } fn connectors_popup_hint_line() -> Line<'static> { @@ -6659,16 +6705,30 @@ impl ChatWidget { self.set_skills_from_response(&ev); } - pub(crate) fn on_connectors_loaded(&mut self, result: Result) { - self.connectors_cache = match result { - Ok(connectors) => ConnectorsCacheState::Ready(connectors), - Err(err) => ConnectorsCacheState::Failed(err), - }; - if let ConnectorsCacheState::Ready(snapshot) = &self.connectors_cache { - self.bottom_pane - .set_connectors_snapshot(Some(snapshot.clone())); - } else { - self.bottom_pane.set_connectors_snapshot(None); + pub(crate) fn on_connectors_loaded( + &mut self, + result: Result, + is_final: bool, + ) { + if is_final { + self.connectors_prefetch_in_flight = false; + } + + match result { + Ok(snapshot) => { + self.refresh_connectors_popup_if_open(&snapshot.connectors); + self.connectors_cache = ConnectorsCacheState::Ready(snapshot.clone()); + self.bottom_pane.set_connectors_snapshot(Some(snapshot)); + } + Err(err) => { + if matches!(self.connectors_cache, ConnectorsCacheState::Ready(_)) { + warn!("failed to refresh apps list; retaining current apps snapshot: {err}"); + return; + } + + self.connectors_cache = ConnectorsCacheState::Failed(err); + self.bottom_pane.set_connectors_snapshot(None); + } } } diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index bf173ceca..43d2ddcb8 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -1074,6 +1074,7 @@ async fn make_chatwidget_manual( agent_turn_running: false, mcp_startup_status: None, connectors_cache: ConnectorsCacheState::default(), + connectors_prefetch_in_flight: false, interrupts: InterruptManager::new(), reasoning_buffer: String::new(), full_reasoning_buffer: String::new(), @@ -3667,6 +3668,74 @@ fn render_bottom_popup(chat: &ChatWidget, width: u16) -> String { lines.join("\n") } +#[tokio::test] +async fn apps_popup_refreshes_when_connectors_snapshot_updates() { + let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await; + chat.config.features.enable(Feature::Apps); + chat.bottom_pane.set_connectors_enabled(true); + + chat.on_connectors_loaded( + Ok(ConnectorsSnapshot { + connectors: vec![codex_chatgpt::connectors::AppInfo { + id: "connector_1".to_string(), + name: "Notion".to_string(), + description: Some("Workspace docs".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://example.test/notion".to_string()), + is_accessible: true, + }], + }), + false, + ); + chat.add_connectors_output(); + + let before = render_bottom_popup(&chat, 80); + assert!( + before.contains("Installed 1 of 1 available apps."), + "expected initial apps popup snapshot, got:\n{before}" + ); + + chat.on_connectors_loaded( + Ok(ConnectorsSnapshot { + connectors: vec![ + codex_chatgpt::connectors::AppInfo { + id: "connector_1".to_string(), + name: "Notion".to_string(), + description: Some("Workspace docs".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://example.test/notion".to_string()), + is_accessible: true, + }, + codex_chatgpt::connectors::AppInfo { + id: "connector_2".to_string(), + name: "Linear".to_string(), + description: Some("Project tracking".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: Some("https://example.test/linear".to_string()), + is_accessible: true, + }, + ], + }), + true, + ); + + let after = render_bottom_popup(&chat, 80); + assert!( + after.contains("Installed 2 of 2 available apps."), + "expected refreshed apps popup snapshot, got:\n{after}" + ); + assert!( + after.contains("Linear"), + "expected refreshed popup to include new connector, got:\n{after}" + ); +} + #[tokio::test] async fn experimental_features_popup_snapshot() { let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;