[apps] Improve app loading. (#10994)

There are two concepts of apps that we load in the harness:

- Directory apps, which is all the apps that the user can install.
- Accessible apps, which is what the user actually installed and can be
$ inserted and be used by the model. These are extracted from the tools
that are loaded through the gateway MCP.

Previously we wait for both sets of apps before returning the full apps
list. Which causes many issues because accessible apps won't be
available to the UI or the model if directory apps aren't loaded or
failed to load.

In this PR we are separating them so that accessible apps can be loaded
separately and are instantly available to be shown in the UI and to be
provided in model context. We also added an app-server event so that
clients can subscribe to also get accessible apps without being blocked
on the full app list.

- [x] Separate accessible apps and directory apps loading.
- [x] `app/list` request will also emit `app/list/updated` notifications
that app-server clients can subscribe. Which allows clients to get
accessible apps list to render in the $ menu without being blocked by
directory apps.
- [x] Cache both accessible and directory apps with 1 hour TTL to avoid
reloading them when creating new threads.
- [x] TUI improvements to redraw $ menu and /apps menu when app list is
updated.
This commit is contained in:
Matthew Zeng 2026-02-08 15:24:56 -08:00 committed by GitHub
parent 181b721ba5
commit 45b7763c3f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
27 changed files with 1164 additions and 87 deletions

View file

@ -21,6 +21,7 @@
"type": "object"
},
"AppsListParams": {
"description": "EXPERIMENTAL - list available apps/connectors.",
"properties": {
"cursor": {
"description": "Opaque pagination cursor returned by a previous call.",
@ -29,6 +30,10 @@
"null"
]
},
"forceRefetch": {
"description": "When true, bypass app caches and fetch the latest data from sources.",
"type": "boolean"
},
"limit": {
"description": "Optional page size; defaults to a reasonable server-side value.",
"format": "uint32",

View file

@ -165,6 +165,71 @@
}
]
},
"AppInfo": {
"description": "EXPERIMENTAL - app metadata returned by app-list APIs.",
"properties": {
"description": {
"type": [
"string",
"null"
]
},
"distributionChannel": {
"type": [
"string",
"null"
]
},
"id": {
"type": "string"
},
"installUrl": {
"type": [
"string",
"null"
]
},
"isAccessible": {
"default": false,
"type": "boolean"
},
"logoUrl": {
"type": [
"string",
"null"
]
},
"logoUrlDark": {
"type": [
"string",
"null"
]
},
"name": {
"type": "string"
}
},
"required": [
"id",
"name"
],
"type": "object"
},
"AppListUpdatedNotification": {
"description": "EXPERIMENTAL - notification emitted when the app list changes.",
"properties": {
"data": {
"items": {
"$ref": "#/definitions/AppInfo"
},
"type": "array"
}
},
"required": [
"data"
],
"type": "object"
},
"AskForApproval": {
"description": "Determines the conditions under which the user is consulted to approve running the command proposed by Codex.",
"oneOf": [
@ -7886,6 +7951,26 @@
"title": "Account/rateLimits/updatedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"app/list/updated"
],
"title": "App/list/updatedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/AppListUpdatedNotification"
}
},
"required": [
"method",
"params"
],
"title": "App/list/updatedNotification",
"type": "object"
},
{
"properties": {
"method": {

View file

@ -8192,6 +8192,26 @@
"title": "Account/rateLimits/updatedNotification",
"type": "object"
},
{
"properties": {
"method": {
"enum": [
"app/list/updated"
],
"title": "App/list/updatedNotificationMethod",
"type": "string"
},
"params": {
"$ref": "#/definitions/v2/AppListUpdatedNotification"
}
},
"required": [
"method",
"params"
],
"title": "App/list/updatedNotification",
"type": "object"
},
{
"properties": {
"method": {
@ -9965,6 +9985,7 @@
"type": "string"
},
"AppInfo": {
"description": "EXPERIMENTAL - app metadata returned by app-list APIs.",
"properties": {
"description": {
"type": [
@ -10013,11 +10034,29 @@
],
"type": "object"
},
"AppListUpdatedNotification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "EXPERIMENTAL - notification emitted when the app list changes.",
"properties": {
"data": {
"items": {
"$ref": "#/definitions/v2/AppInfo"
},
"type": "array"
}
},
"required": [
"data"
],
"title": "AppListUpdatedNotification",
"type": "object"
},
"AppsConfig": {
"type": "object"
},
"AppsListParams": {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "EXPERIMENTAL - list available apps/connectors.",
"properties": {
"cursor": {
"description": "Opaque pagination cursor returned by a previous call.",
@ -10026,6 +10065,10 @@
"null"
]
},
"forceRefetch": {
"description": "When true, bypass app caches and fetch the latest data from sources.",
"type": "boolean"
},
"limit": {
"description": "Optional page size; defaults to a reasonable server-side value.",
"format": "uint32",
@ -10041,6 +10084,7 @@
},
"AppsListResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "EXPERIMENTAL - app list response.",
"properties": {
"data": {
"items": {

View file

@ -0,0 +1,69 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"AppInfo": {
"description": "EXPERIMENTAL - app metadata returned by app-list APIs.",
"properties": {
"description": {
"type": [
"string",
"null"
]
},
"distributionChannel": {
"type": [
"string",
"null"
]
},
"id": {
"type": "string"
},
"installUrl": {
"type": [
"string",
"null"
]
},
"isAccessible": {
"default": false,
"type": "boolean"
},
"logoUrl": {
"type": [
"string",
"null"
]
},
"logoUrlDark": {
"type": [
"string",
"null"
]
},
"name": {
"type": "string"
}
},
"required": [
"id",
"name"
],
"type": "object"
}
},
"description": "EXPERIMENTAL - notification emitted when the app list changes.",
"properties": {
"data": {
"items": {
"$ref": "#/definitions/AppInfo"
},
"type": "array"
}
},
"required": [
"data"
],
"title": "AppListUpdatedNotification",
"type": "object"
}

View file

@ -1,5 +1,6 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"description": "EXPERIMENTAL - list available apps/connectors.",
"properties": {
"cursor": {
"description": "Opaque pagination cursor returned by a previous call.",
@ -8,6 +9,10 @@
"null"
]
},
"forceRefetch": {
"description": "When true, bypass app caches and fetch the latest data from sources.",
"type": "boolean"
},
"limit": {
"description": "Optional page size; defaults to a reasonable server-side value.",
"format": "uint32",

View file

@ -2,6 +2,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"AppInfo": {
"description": "EXPERIMENTAL - app metadata returned by app-list APIs.",
"properties": {
"description": {
"type": [
@ -51,6 +52,7 @@
"type": "object"
}
},
"description": "EXPERIMENTAL - app list response.",
"properties": {
"data": {
"items": {

View file

@ -8,6 +8,7 @@ import type { AccountLoginCompletedNotification } from "./v2/AccountLoginComplet
import type { AccountRateLimitsUpdatedNotification } from "./v2/AccountRateLimitsUpdatedNotification";
import type { AccountUpdatedNotification } from "./v2/AccountUpdatedNotification";
import type { AgentMessageDeltaNotification } from "./v2/AgentMessageDeltaNotification";
import type { AppListUpdatedNotification } from "./v2/AppListUpdatedNotification";
import type { CommandExecutionOutputDeltaNotification } from "./v2/CommandExecutionOutputDeltaNotification";
import type { ConfigWarningNotification } from "./v2/ConfigWarningNotification";
import type { ContextCompactedNotification } from "./v2/ContextCompactedNotification";
@ -36,4 +37,4 @@ import type { WindowsWorldWritableWarningNotification } from "./v2/WindowsWorldW
/**
* Notification sent from the server to the client.
*/
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };
export type ServerNotification = { "method": "error", "params": ErrorNotification } | { "method": "thread/started", "params": ThreadStartedNotification } | { "method": "thread/name/updated", "params": ThreadNameUpdatedNotification } | { "method": "thread/tokenUsage/updated", "params": ThreadTokenUsageUpdatedNotification } | { "method": "turn/started", "params": TurnStartedNotification } | { "method": "turn/completed", "params": TurnCompletedNotification } | { "method": "turn/diff/updated", "params": TurnDiffUpdatedNotification } | { "method": "turn/plan/updated", "params": TurnPlanUpdatedNotification } | { "method": "item/started", "params": ItemStartedNotification } | { "method": "item/completed", "params": ItemCompletedNotification } | { "method": "rawResponseItem/completed", "params": RawResponseItemCompletedNotification } | { "method": "item/agentMessage/delta", "params": AgentMessageDeltaNotification } | { "method": "item/plan/delta", "params": PlanDeltaNotification } | { "method": "item/commandExecution/outputDelta", "params": CommandExecutionOutputDeltaNotification } | { "method": "item/commandExecution/terminalInteraction", "params": TerminalInteractionNotification } | { "method": "item/fileChange/outputDelta", "params": FileChangeOutputDeltaNotification } | { "method": "item/mcpToolCall/progress", "params": McpToolCallProgressNotification } | { "method": "mcpServer/oauthLogin/completed", "params": McpServerOauthLoginCompletedNotification } | { "method": "account/updated", "params": AccountUpdatedNotification } | { "method": "account/rateLimits/updated", "params": AccountRateLimitsUpdatedNotification } | { "method": "app/list/updated", "params": AppListUpdatedNotification } | { "method": "item/reasoning/summaryTextDelta", "params": ReasoningSummaryTextDeltaNotification } | { "method": "item/reasoning/summaryPartAdded", "params": ReasoningSummaryPartAddedNotification } | { "method": "item/reasoning/textDelta", "params": ReasoningTextDeltaNotification } | { "method": "thread/compacted", "params": ContextCompactedNotification } | { "method": "deprecationNotice", "params": DeprecationNoticeNotification } | { "method": "configWarning", "params": ConfigWarningNotification } | { "method": "windows/worldWritableWarning", "params": WindowsWorldWritableWarningNotification } | { "method": "account/login/completed", "params": AccountLoginCompletedNotification } | { "method": "authStatusChange", "params": AuthStatusChangeNotification } | { "method": "loginChatGptComplete", "params": LoginChatGptCompleteNotification } | { "method": "sessionConfigured", "params": SessionConfiguredNotification };

View file

@ -2,4 +2,7 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
/**
* EXPERIMENTAL - app metadata returned by app-list APIs.
*/
export type AppInfo = { id: string, name: string, description: string | null, logoUrl: string | null, logoUrlDark: string | null, distributionChannel: string | null, installUrl: string | null, isAccessible: boolean, };

View file

@ -0,0 +1,9 @@
// GENERATED CODE! DO NOT MODIFY BY HAND!
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { AppInfo } from "./AppInfo";
/**
* EXPERIMENTAL - notification emitted when the app list changes.
*/
export type AppListUpdatedNotification = { data: Array<AppInfo>, };

View file

@ -2,6 +2,9 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
/**
* EXPERIMENTAL - list available apps/connectors.
*/
export type AppsListParams = {
/**
* Opaque pagination cursor returned by a previous call.
@ -10,4 +13,8 @@ cursor?: string | null,
/**
* Optional page size; defaults to a reasonable server-side value.
*/
limit?: number | null, };
limit?: number | null,
/**
* When true, bypass app caches and fetch the latest data from sources.
*/
forceRefetch?: boolean, };

View file

@ -3,6 +3,9 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { AppInfo } from "./AppInfo";
/**
* EXPERIMENTAL - app list response.
*/
export type AppsListResponse = { data: Array<AppInfo>,
/**
* Opaque cursor to pass to the next call to continue after the last item.

View file

@ -8,6 +8,7 @@ export type { AgentMessageDeltaNotification } from "./AgentMessageDeltaNotificat
export type { AnalyticsConfig } from "./AnalyticsConfig";
export type { AppDisabledReason } from "./AppDisabledReason";
export type { AppInfo } from "./AppInfo";
export type { AppListUpdatedNotification } from "./AppListUpdatedNotification";
export type { AppsConfig } from "./AppsConfig";
export type { AppsListParams } from "./AppsListParams";
export type { AppsListResponse } from "./AppsListResponse";

View file

@ -721,6 +721,7 @@ server_notification_definitions! {
McpServerOauthLoginCompleted => "mcpServer/oauthLogin/completed" (v2::McpServerOauthLoginCompletedNotification),
AccountUpdated => "account/updated" (v2::AccountUpdatedNotification),
AccountRateLimitsUpdated => "account/rateLimits/updated" (v2::AccountRateLimitsUpdatedNotification),
AppListUpdated => "app/list/updated" (v2::AppListUpdatedNotification),
ReasoningSummaryTextDelta => "item/reasoning/summaryTextDelta" (v2::ReasoningSummaryTextDeltaNotification),
ReasoningSummaryPartAdded => "item/reasoning/summaryPartAdded" (v2::ReasoningSummaryPartAddedNotification),
ReasoningTextDelta => "item/reasoning/textDelta" (v2::ReasoningTextDeltaNotification),

View file

@ -1193,6 +1193,7 @@ pub struct ListMcpServerStatusResponse {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
/// EXPERIMENTAL - list available apps/connectors.
pub struct AppsListParams {
/// Opaque pagination cursor returned by a previous call.
#[ts(optional = nullable)]
@ -1200,11 +1201,15 @@ pub struct AppsListParams {
/// Optional page size; defaults to a reasonable server-side value.
#[ts(optional = nullable)]
pub limit: Option<u32>,
/// When true, bypass app caches and fetch the latest data from sources.
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub force_refetch: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
/// EXPERIMENTAL - app metadata returned by app-list APIs.
pub struct AppInfo {
pub id: String,
pub name: String,
@ -1220,6 +1225,7 @@ pub struct AppInfo {
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
/// EXPERIMENTAL - app list response.
pub struct AppsListResponse {
pub data: Vec<AppInfo>,
/// Opaque cursor to pass to the next call to continue after the last item.
@ -1227,6 +1233,14 @@ pub struct AppsListResponse {
pub next_cursor: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]
/// EXPERIMENTAL - notification emitted when the app list changes.
pub struct AppListUpdatedNotification {
pub data: Vec<AppInfo>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export_to = "v2/")]

View file

@ -700,7 +700,8 @@ Use `app/list` to fetch available apps (connectors). Each entry includes metadat
```json
{ "method": "app/list", "id": 50, "params": {
"cursor": null,
"limit": 50
"limit": 50,
"forceRefetch": false
} }
{ "id": 50, "result": {
"data": [
@ -719,6 +720,30 @@ Use `app/list` to fetch available apps (connectors). Each entry includes metadat
} }
```
`app/list` returns after both accessible apps and directory apps are loaded. Set `forceRefetch: true` to bypass app caches and fetch fresh data from sources. Cache entries are only replaced when those refetches succeed.
The server also emits `app/list/updated` notifications whenever either source (accessible apps or directory apps) finishes loading. Each notification includes the latest merged app list.
```json
{
"method": "app/list/updated",
"params": {
"data": [
{
"id": "demo-app",
"name": "Demo App",
"description": "Example connector for documentation.",
"logoUrl": "https://example.com/demo-app.png",
"logoUrlDark": null,
"distributionChannel": null,
"installUrl": "https://chatgpt.com/apps/demo-app/demo-app",
"isAccessible": true
}
]
}
}
```
Invoke an app by inserting `$<app-slug>` in the text input. The slug is derived from the app name and lowercased with non-alphanumeric characters replaced by `-` (for example, "Demo App" becomes `$demo-app`). Add a `mention` input item (recommended) so the server uses the exact `app://<connector-id>` path rather than guessing by name.
Example:

View file

@ -15,6 +15,8 @@ use codex_app_server_protocol::AccountLoginCompletedNotification;
use codex_app_server_protocol::AccountUpdatedNotification;
use codex_app_server_protocol::AddConversationListenerParams;
use codex_app_server_protocol::AddConversationSubscriptionResponse;
use codex_app_server_protocol::AppInfo;
use codex_app_server_protocol::AppListUpdatedNotification;
use codex_app_server_protocol::AppsListParams;
use codex_app_server_protocol::AppsListResponse;
use codex_app_server_protocol::ArchiveConversationParams;
@ -269,6 +271,7 @@ const THREAD_LIST_MAX_LIMIT: usize = 100;
// Duration before a ChatGPT login attempt is abandoned.
const LOGIN_CHATGPT_TIMEOUT: Duration = Duration::from_secs(10 * 60);
const APP_LIST_LOAD_TIMEOUT: Duration = Duration::from_secs(90);
struct ActiveLogin {
shutdown_handle: ShutdownHandle,
login_id: Uuid,
@ -279,6 +282,11 @@ enum CancelLoginError {
NotFound(Uuid),
}
enum AppListLoadResult {
Accessible(Result<Vec<AppInfo>, String>),
Directory(Result<Vec<AppInfo>, String>),
}
impl Drop for ActiveLogin {
fn drop(&mut self) {
self.shutdown_handle.shutdown();
@ -4324,7 +4332,6 @@ impl CodexMessageProcessor {
}
async fn apps_list(&self, request_id: ConnectionRequestId, params: AppsListParams) {
let AppsListParams { cursor, limit } = params;
let config = match self.load_latest_config().await {
Ok(config) => config,
Err(error) => {
@ -4346,65 +4353,182 @@ impl CodexMessageProcessor {
return;
}
let connectors = match connectors::list_connectors(&config).await {
Ok(connectors) => connectors,
Err(err) => {
self.send_internal_error(request_id, format!("failed to list apps: {err}"))
.await;
return;
}
};
let request = request_id.clone();
let outgoing = Arc::clone(&self.outgoing);
tokio::spawn(async move {
Self::apps_list_task(outgoing, request, params, config).await;
});
}
let total = connectors.len();
if total == 0 {
self.outgoing
.send_response(
request_id,
AppsListResponse {
data: Vec::new(),
next_cursor: None,
},
)
.await;
return;
}
let effective_limit = limit.unwrap_or(total as u32).max(1) as usize;
let effective_limit = effective_limit.min(total);
async fn apps_list_task(
outgoing: Arc<OutgoingMessageSender>,
request_id: ConnectionRequestId,
params: AppsListParams,
config: Config,
) {
let AppsListParams {
cursor,
limit,
force_refetch,
} = params;
let start = match cursor {
Some(cursor) => match cursor.parse::<usize>() {
Ok(idx) => idx,
Err(_) => {
self.send_invalid_request_error(
request_id,
format!("invalid cursor: {cursor}"),
)
.await;
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid cursor: {cursor}"),
data: None,
};
outgoing.send_error(request_id, error).await;
return;
}
},
None => 0,
};
if start > total {
self.send_invalid_request_error(
request_id,
format!("cursor {start} exceeds total apps {total}"),
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let accessible_config = config.clone();
let accessible_tx = tx.clone();
tokio::spawn(async move {
let result = connectors::list_accessible_connectors_from_mcp_tools_with_options(
&accessible_config,
force_refetch,
)
.await;
return;
.await
.map_err(|err| format!("failed to load accessible apps: {err}"));
let _ = accessible_tx.send(AppListLoadResult::Accessible(result));
});
tokio::spawn(async move {
let result = connectors::list_all_connectors_with_options(&config, force_refetch)
.await
.map_err(|err| format!("failed to list apps: {err}"));
let _ = tx.send(AppListLoadResult::Directory(result));
});
let mut accessible_connectors: Option<Vec<AppInfo>> = None;
let mut all_connectors: Option<Vec<AppInfo>> = None;
let app_list_deadline = tokio::time::Instant::now() + APP_LIST_LOAD_TIMEOUT;
loop {
let result = match tokio::time::timeout_at(app_list_deadline, rx.recv()).await {
Ok(Some(result)) => result,
Ok(None) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: "failed to load app lists".to_string(),
data: None,
};
outgoing.send_error(request_id, error).await;
return;
}
Err(_) => {
let timeout_seconds = APP_LIST_LOAD_TIMEOUT.as_secs();
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"timed out waiting for app lists after {timeout_seconds} seconds"
),
data: None,
};
outgoing.send_error(request_id, error).await;
return;
}
};
match result {
AppListLoadResult::Accessible(Ok(connectors)) => {
accessible_connectors = Some(connectors);
}
AppListLoadResult::Accessible(Err(err)) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: err,
data: None,
};
outgoing.send_error(request_id, error).await;
return;
}
AppListLoadResult::Directory(Ok(connectors)) => {
all_connectors = Some(connectors);
}
AppListLoadResult::Directory(Err(err)) => {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: err,
data: None,
};
outgoing.send_error(request_id, error).await;
return;
}
}
let merged = Self::merge_loaded_apps(
all_connectors.as_deref(),
accessible_connectors.as_deref(),
);
Self::send_app_list_updated_notification(&outgoing, merged.clone()).await;
if accessible_connectors.is_some() && all_connectors.is_some() {
match Self::paginate_apps(merged.as_slice(), start, limit) {
Ok(response) => {
outgoing.send_response(request_id, response).await;
return;
}
Err(error) => {
outgoing.send_error(request_id, error).await;
return;
}
}
}
}
}
fn merge_loaded_apps(
all_connectors: Option<&[AppInfo]>,
accessible_connectors: Option<&[AppInfo]>,
) -> Vec<AppInfo> {
let all = all_connectors.map_or_else(Vec::new, <[AppInfo]>::to_vec);
let accessible = accessible_connectors.map_or_else(Vec::new, <[AppInfo]>::to_vec);
connectors::merge_connectors_with_accessible(all, accessible)
}
fn paginate_apps(
connectors: &[AppInfo],
start: usize,
limit: Option<u32>,
) -> Result<AppsListResponse, JSONRPCErrorError> {
let total = connectors.len();
if start > total {
return Err(JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("cursor {start} exceeds total apps {total}"),
data: None,
});
}
let effective_limit = limit.unwrap_or(total as u32).max(1) as usize;
let end = start.saturating_add(effective_limit).min(total);
let data = connectors[start..end].to_vec();
let next_cursor = if end < total {
Some(end.to_string())
} else {
None
};
self.outgoing
.send_response(request_id, AppsListResponse { data, next_cursor })
Ok(AppsListResponse { data, next_cursor })
}
async fn send_app_list_updated_notification(
outgoing: &Arc<OutgoingMessageSender>,
data: Vec<AppInfo>,
) {
outgoing
.send_server_notification(ServerNotification::AppListUpdated(
AppListUpdatedNotification { data },
))
.await;
}

View file

@ -3,6 +3,7 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use anyhow::bail;
use app_test_support::ChatGptAuthFixture;
use app_test_support::McpProcess;
use app_test_support::to_response;
@ -15,10 +16,13 @@ use axum::http::StatusCode;
use axum::http::header::AUTHORIZATION;
use axum::routing::get;
use codex_app_server_protocol::AppInfo;
use codex_app_server_protocol::AppListUpdatedNotification;
use codex_app_server_protocol::AppsListParams;
use codex_app_server_protocol::AppsListResponse;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ServerNotification;
use codex_core::auth::AuthCredentialsStoreMode;
use pretty_assertions::assert_eq;
use rmcp::handler::server::ServerHandler;
@ -51,6 +55,7 @@ async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> {
.send_apps_list_request(AppsListParams {
limit: Some(50),
cursor: None,
force_refetch: false,
})
.await?;
@ -67,6 +72,120 @@ async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<()> {
let connectors = vec![
AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
logo_url: Some("https://example.com/alpha.png".to_string()),
logo_url_dark: None,
distribution_channel: None,
install_url: None,
is_accessible: false,
},
AppInfo {
id: "beta".to_string(),
name: "beta".to_string(),
description: None,
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
install_url: None,
is_accessible: false,
},
];
let tools = vec![connector_tool("beta", "Beta App")?];
let (server_url, server_handle) = start_apps_server_with_delays(
connectors.clone(),
tools,
Duration::from_millis(300),
Duration::ZERO,
)
.await?;
let codex_home = TempDir::new()?;
write_connectors_config(codex_home.path(), &server_url)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_apps_list_request(AppsListParams {
limit: None,
cursor: None,
force_refetch: false,
})
.await?;
let expected_accessible = vec![AppInfo {
id: "beta".to_string(),
name: "Beta App".to_string(),
description: None,
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
install_url: Some("https://chatgpt.com/apps/beta-app/beta".to_string()),
is_accessible: true,
}];
let first_update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(first_update.data, expected_accessible);
let expected_merged = vec![
AppInfo {
id: "beta".to_string(),
name: "Beta App".to_string(),
description: None,
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()),
is_accessible: true,
},
AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
logo_url: Some("https://example.com/alpha.png".to_string()),
logo_url_dark: None,
distribution_channel: None,
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
is_accessible: false,
},
];
let second_update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(second_update.data, expected_merged);
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let AppsListResponse {
data: response_data,
next_cursor,
} = to_response(response)?;
assert_eq!(response_data, expected_merged);
assert!(next_cursor.is_none());
server_handle.abort();
let _ = server_handle.await;
Ok(())
}
#[tokio::test]
async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> {
let connectors = vec![
@ -93,7 +212,13 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> {
];
let tools = vec![connector_tool("beta", "Beta App")?];
let (server_url, server_handle) = start_apps_server(connectors.clone(), tools).await?;
let (server_url, server_handle) = start_apps_server_with_delays(
connectors.clone(),
tools,
Duration::ZERO,
Duration::from_millis(300),
)
.await?;
let codex_home = TempDir::new()?;
write_connectors_config(codex_home.path(), &server_url)?;
@ -113,16 +238,36 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> {
.send_apps_list_request(AppsListParams {
limit: None,
cursor: None,
force_refetch: false,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let AppsListResponse { data, next_cursor } = to_response(response)?;
let first_update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(
first_update.data,
vec![
AppInfo {
id: "alpha".to_string(),
name: "Alpha".to_string(),
description: Some("Alpha connector".to_string()),
logo_url: Some("https://example.com/alpha.png".to_string()),
logo_url_dark: None,
distribution_channel: None,
install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()),
is_accessible: false,
},
AppInfo {
id: "beta".to_string(),
name: "beta".to_string(),
description: None,
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()),
is_accessible: false,
},
]
);
let expected = vec![
AppInfo {
@ -147,6 +292,15 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> {
},
];
let second_update = read_app_list_updated_notification(&mut mcp).await?;
assert_eq!(second_update.data, expected);
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let AppsListResponse { data, next_cursor } = to_response(response)?;
assert_eq!(data, expected);
assert!(next_cursor.is_none());
@ -180,7 +334,13 @@ async fn list_apps_paginates_results() -> Result<()> {
];
let tools = vec![connector_tool("beta", "Beta App")?];
let (server_url, server_handle) = start_apps_server(connectors.clone(), tools).await?;
let (server_url, server_handle) = start_apps_server_with_delays(
connectors.clone(),
tools,
Duration::ZERO,
Duration::from_millis(300),
)
.await?;
let codex_home = TempDir::new()?;
write_connectors_config(codex_home.path(), &server_url)?;
@ -200,6 +360,7 @@ async fn list_apps_paginates_results() -> Result<()> {
.send_apps_list_request(AppsListParams {
limit: Some(1),
cursor: None,
force_refetch: false,
})
.await?;
let first_response: JSONRPCResponse = timeout(
@ -226,10 +387,18 @@ async fn list_apps_paginates_results() -> Result<()> {
assert_eq!(first_page, expected_first);
let next_cursor = first_cursor.ok_or_else(|| anyhow::anyhow!("missing cursor"))?;
loop {
let update = read_app_list_updated_notification(&mut mcp).await?;
if update.data.len() == 2 && update.data.iter().any(|connector| connector.is_accessible) {
break;
}
}
let second_request = mcp
.send_apps_list_request(AppsListParams {
limit: Some(1),
cursor: Some(next_cursor),
force_refetch: false,
})
.await?;
let second_response: JSONRPCResponse = timeout(
@ -260,21 +429,134 @@ async fn list_apps_paginates_results() -> Result<()> {
Ok(())
}
#[tokio::test]
async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result<()> {
let connectors = vec![AppInfo {
id: "beta".to_string(),
name: "Beta App".to_string(),
description: Some("Beta connector".to_string()),
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
install_url: None,
is_accessible: false,
}];
let tools = vec![connector_tool("beta", "Beta App")?];
let (server_url, server_handle) =
start_apps_server_with_delays(connectors, tools, Duration::ZERO, Duration::ZERO).await?;
let codex_home = TempDir::new()?;
write_connectors_config(codex_home.path(), &server_url)?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let initial_request = mcp
.send_apps_list_request(AppsListParams {
limit: None,
cursor: None,
force_refetch: false,
})
.await?;
let initial_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(initial_request)),
)
.await??;
let AppsListResponse {
data: initial_data,
next_cursor: initial_next_cursor,
} = to_response(initial_response)?;
assert!(initial_next_cursor.is_none());
assert_eq!(initial_data.len(), 1);
assert!(initial_data.iter().all(|app| app.is_accessible));
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token-invalid")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
let refetch_request = mcp
.send_apps_list_request(AppsListParams {
limit: None,
cursor: None,
force_refetch: true,
})
.await?;
let refetch_error: JSONRPCError = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(refetch_request)),
)
.await??;
assert!(refetch_error.error.message.contains("failed to"));
let cached_request = mcp
.send_apps_list_request(AppsListParams {
limit: None,
cursor: None,
force_refetch: false,
})
.await?;
let cached_response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(cached_request)),
)
.await??;
let AppsListResponse {
data: cached_data,
next_cursor: cached_next_cursor,
} = to_response(cached_response)?;
assert_eq!(cached_data, initial_data);
assert!(cached_next_cursor.is_none());
server_handle.abort();
Ok(())
}
async fn read_app_list_updated_notification(
mcp: &mut McpProcess,
) -> Result<AppListUpdatedNotification> {
let notification = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_notification_message("app/list/updated"),
)
.await??;
let parsed: ServerNotification = notification.try_into()?;
let ServerNotification::AppListUpdated(payload) = parsed else {
bail!("unexpected notification variant");
};
Ok(payload)
}
#[derive(Clone)]
struct AppsServerState {
expected_bearer: String,
expected_account_id: String,
response: serde_json::Value,
directory_delay: Duration,
}
#[derive(Clone)]
struct AppListMcpServer {
tools: Arc<Vec<Tool>>,
tools_delay: Duration,
}
impl AppListMcpServer {
fn new(tools: Arc<Vec<Tool>>) -> Self {
Self { tools }
fn new(tools: Arc<Vec<Tool>>, tools_delay: Duration) -> Self {
Self { tools, tools_delay }
}
}
@ -293,7 +575,11 @@ impl ServerHandler for AppListMcpServer {
) -> impl std::future::Future<Output = Result<ListToolsResult, rmcp::ErrorData>> + Send + '_
{
let tools = self.tools.clone();
let tools_delay = self.tools_delay;
async move {
if tools_delay > Duration::ZERO {
tokio::time::sleep(tools_delay).await;
}
Ok(ListToolsResult {
tools: (*tools).clone(),
next_cursor: None,
@ -303,14 +589,17 @@ impl ServerHandler for AppListMcpServer {
}
}
async fn start_apps_server(
async fn start_apps_server_with_delays(
connectors: Vec<AppInfo>,
tools: Vec<Tool>,
directory_delay: Duration,
tools_delay: Duration,
) -> Result<(String, JoinHandle<()>)> {
let state = AppsServerState {
expected_bearer: "Bearer chatgpt-token".to_string(),
expected_account_id: "account-123".to_string(),
response: json!({ "apps": connectors, "next_token": null }),
directory_delay,
};
let state = Arc::new(state);
let tools = Arc::new(tools);
@ -321,7 +610,7 @@ async fn start_apps_server(
let mcp_service = StreamableHttpService::new(
{
let tools = tools.clone();
move || Ok(AppListMcpServer::new(tools.clone()))
move || Ok(AppListMcpServer::new(tools.clone(), tools_delay))
},
Arc::new(LocalSessionManager::default()),
StreamableHttpServerConfig::default(),
@ -347,6 +636,10 @@ async fn list_directory_connectors(
State(state): State<Arc<AppsServerState>>,
headers: HeaderMap,
) -> Result<impl axum::response::IntoResponse, StatusCode> {
if state.directory_delay > Duration::ZERO {
tokio::time::sleep(state.directory_delay).await;
}
let bearer_ok = headers
.get(AUTHORIZATION)
.and_then(|value| value.to_str().ok())

View file

@ -1,18 +1,24 @@
use std::collections::HashMap;
use std::sync::LazyLock;
use std::sync::Mutex as StdMutex;
use codex_core::config::Config;
use codex_core::features::Feature;
use codex_core::token_data::TokenData;
use serde::Deserialize;
use std::time::Duration;
use std::time::Instant;
use crate::chatgpt_client::chatgpt_get_request_with_timeout;
use crate::chatgpt_token::get_chatgpt_token_data;
use crate::chatgpt_token::init_chatgpt_token_from_auth;
pub use codex_core::connectors::AppInfo;
use codex_core::connectors::CONNECTORS_CACHE_TTL;
pub use codex_core::connectors::connector_display_label;
use codex_core::connectors::connector_install_url;
pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools;
pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools_with_options;
use codex_core::connectors::merge_connectors;
#[derive(Debug, Deserialize)]
@ -38,6 +44,24 @@ struct DirectoryApp {
const DIRECTORY_CONNECTORS_TIMEOUT: Duration = Duration::from_secs(60);
#[derive(Clone, PartialEq, Eq)]
struct AllConnectorsCacheKey {
chatgpt_base_url: String,
account_id: Option<String>,
chatgpt_user_id: Option<String>,
is_workspace_account: bool,
}
#[derive(Clone)]
struct CachedAllConnectors {
key: AllConnectorsCacheKey,
expires_at: Instant,
connectors: Vec<AppInfo>,
}
static ALL_CONNECTORS_CACHE: LazyLock<StdMutex<Option<CachedAllConnectors>>> =
LazyLock::new(|| StdMutex::new(None));
pub async fn list_connectors(config: &Config) -> anyhow::Result<Vec<AppInfo>> {
if !config.features.enabled(Feature::Apps) {
return Ok(Vec::new());
@ -48,11 +72,17 @@ pub async fn list_connectors(config: &Config) -> anyhow::Result<Vec<AppInfo>> {
);
let connectors = connectors_result?;
let accessible = accessible_result?;
let merged = merge_connectors(connectors, accessible);
Ok(filter_disallowed_connectors(merged))
Ok(merge_connectors_with_accessible(connectors, accessible))
}
pub async fn list_all_connectors(config: &Config) -> anyhow::Result<Vec<AppInfo>> {
list_all_connectors_with_options(config, false).await
}
pub async fn list_all_connectors_with_options(
config: &Config,
force_refetch: bool,
) -> anyhow::Result<Vec<AppInfo>> {
if !config.features.enabled(Feature::Apps) {
return Ok(Vec::new());
}
@ -61,6 +91,11 @@ pub async fn list_all_connectors(config: &Config) -> anyhow::Result<Vec<AppInfo>
let token_data =
get_chatgpt_token_data().ok_or_else(|| anyhow::anyhow!("ChatGPT token not available"))?;
let cache_key = all_connectors_cache_key(config, &token_data);
if !force_refetch && let Some(cached_connectors) = read_cached_all_connectors(&cache_key) {
return Ok(cached_connectors);
}
let mut apps = list_directory_connectors(config).await?;
if token_data.id_token.is_workspace_account() {
apps.extend(list_workspace_connectors(config).await?);
@ -84,9 +119,56 @@ pub async fn list_all_connectors(config: &Config) -> anyhow::Result<Vec<AppInfo>
.cmp(&right.name)
.then_with(|| left.id.cmp(&right.id))
});
write_cached_all_connectors(cache_key, &connectors);
Ok(connectors)
}
fn all_connectors_cache_key(config: &Config, token_data: &TokenData) -> AllConnectorsCacheKey {
AllConnectorsCacheKey {
chatgpt_base_url: config.chatgpt_base_url.clone(),
account_id: token_data.account_id.clone(),
chatgpt_user_id: token_data.id_token.chatgpt_user_id.clone(),
is_workspace_account: token_data.id_token.is_workspace_account(),
}
}
fn read_cached_all_connectors(cache_key: &AllConnectorsCacheKey) -> Option<Vec<AppInfo>> {
let mut cache_guard = ALL_CONNECTORS_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let now = Instant::now();
if let Some(cached) = cache_guard.as_ref() {
if now < cached.expires_at && cached.key == *cache_key {
return Some(cached.connectors.clone());
}
if now >= cached.expires_at {
*cache_guard = None;
}
}
None
}
fn write_cached_all_connectors(cache_key: AllConnectorsCacheKey, connectors: &[AppInfo]) {
let mut cache_guard = ALL_CONNECTORS_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*cache_guard = Some(CachedAllConnectors {
key: cache_key,
expires_at: Instant::now() + CONNECTORS_CACHE_TTL,
connectors: connectors.to_vec(),
});
}
pub fn merge_connectors_with_accessible(
connectors: Vec<AppInfo>,
accessible_connectors: Vec<AppInfo>,
) -> Vec<AppInfo> {
let merged = merge_connectors(connectors, accessible_connectors);
filter_disallowed_connectors(merged)
}
async fn list_directory_connectors(config: &Config) -> anyhow::Result<Vec<DirectoryApp>> {
let mut apps = Vec::new();
let mut next_token: Option<String> = None;

View file

@ -1,6 +1,10 @@
use std::collections::HashMap;
use std::env;
use std::path::PathBuf;
use std::sync::LazyLock;
use std::sync::Mutex as StdMutex;
use std::time::Duration;
use std::time::Instant;
use async_channel::unbounded;
pub use codex_app_server_protocol::AppInfo;
@ -8,6 +12,7 @@ use codex_protocol::protocol::SandboxPolicy;
use tokio_util::sync::CancellationToken;
use crate::AuthManager;
use crate::CodexAuth;
use crate::SandboxState;
use crate::config::Config;
use crate::features::Feature;
@ -16,9 +21,37 @@ use crate::mcp::auth::compute_auth_statuses;
use crate::mcp::with_codex_apps_mcp;
use crate::mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::token_data::TokenData;
pub const CONNECTORS_CACHE_TTL: Duration = Duration::from_secs(3600);
#[derive(Clone, PartialEq, Eq)]
struct AccessibleConnectorsCacheKey {
chatgpt_base_url: String,
account_id: Option<String>,
chatgpt_user_id: Option<String>,
is_workspace_account: bool,
}
#[derive(Clone)]
struct CachedAccessibleConnectors {
key: AccessibleConnectorsCacheKey,
expires_at: Instant,
connectors: Vec<AppInfo>,
}
static ACCESSIBLE_CONNECTORS_CACHE: LazyLock<StdMutex<Option<CachedAccessibleConnectors>>> =
LazyLock::new(|| StdMutex::new(None));
pub async fn list_accessible_connectors_from_mcp_tools(
config: &Config,
) -> anyhow::Result<Vec<AppInfo>> {
list_accessible_connectors_from_mcp_tools_with_options(config, false).await
}
pub async fn list_accessible_connectors_from_mcp_tools_with_options(
config: &Config,
force_refetch: bool,
) -> anyhow::Result<Vec<AppInfo>> {
if !config.features.enabled(Feature::Apps) {
return Ok(Vec::new());
@ -26,6 +59,12 @@ pub async fn list_accessible_connectors_from_mcp_tools(
let auth_manager = auth_manager_from_config(config);
let auth = auth_manager.auth().await;
let cache_key = accessible_connectors_cache_key(config, auth.as_ref());
if !force_refetch && let Some(cached_connectors) = read_cached_accessible_connectors(&cache_key)
{
return Ok(cached_connectors);
}
let mcp_servers = with_codex_apps_mcp(HashMap::new(), true, auth.as_ref(), config);
if mcp_servers.is_empty() {
return Ok(Vec::new());
@ -57,17 +96,79 @@ pub async fn list_accessible_connectors_from_mcp_tools(
)
.await;
if let Some(cfg) = mcp_servers.get(CODEX_APPS_MCP_SERVER_NAME) {
let codex_apps_ready = if let Some(cfg) = mcp_servers.get(CODEX_APPS_MCP_SERVER_NAME) {
let timeout = cfg.startup_timeout_sec.unwrap_or(DEFAULT_STARTUP_TIMEOUT);
mcp_connection_manager
.wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, timeout)
.await;
}
.await
} else {
false
};
let tools = mcp_connection_manager.list_all_tools().await;
cancel_token.cancel();
Ok(accessible_connectors_from_mcp_tools(&tools))
let accessible_connectors = accessible_connectors_from_mcp_tools(&tools);
if codex_apps_ready || !accessible_connectors.is_empty() {
write_cached_accessible_connectors(cache_key, &accessible_connectors);
}
Ok(accessible_connectors)
}
fn accessible_connectors_cache_key(
config: &Config,
auth: Option<&CodexAuth>,
) -> AccessibleConnectorsCacheKey {
let token_data: Option<TokenData> = auth.and_then(|auth| auth.get_token_data().ok());
let account_id = token_data
.as_ref()
.and_then(|token_data| token_data.account_id.clone());
let chatgpt_user_id = token_data
.as_ref()
.and_then(|token_data| token_data.id_token.chatgpt_user_id.clone());
let is_workspace_account = token_data
.as_ref()
.is_some_and(|token_data| token_data.id_token.is_workspace_account());
AccessibleConnectorsCacheKey {
chatgpt_base_url: config.chatgpt_base_url.clone(),
account_id,
chatgpt_user_id,
is_workspace_account,
}
}
fn read_cached_accessible_connectors(
cache_key: &AccessibleConnectorsCacheKey,
) -> Option<Vec<AppInfo>> {
let mut cache_guard = ACCESSIBLE_CONNECTORS_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let now = Instant::now();
if let Some(cached) = cache_guard.as_ref() {
if now < cached.expires_at && cached.key == *cache_key {
return Some(cached.connectors.clone());
}
if now >= cached.expires_at {
*cache_guard = None;
}
}
None
}
fn write_cached_accessible_connectors(
cache_key: AccessibleConnectorsCacheKey,
connectors: &[AppInfo],
) {
let mut cache_guard = ACCESSIBLE_CONNECTORS_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*cache_guard = Some(CachedAccessibleConnectors {
key: cache_key,
expires_at: Instant::now() + CONNECTORS_CACHE_TTL,
connectors: connectors.to_vec(),
});
}
fn auth_manager_from_config(config: &Config) -> std::sync::Arc<AuthManager> {

View file

@ -1603,8 +1603,8 @@ impl App {
AppEvent::RateLimitSnapshotFetched(snapshot) => {
self.chat_widget.on_rate_limit_snapshot(Some(snapshot));
}
AppEvent::ConnectorsLoaded(result) => {
self.chat_widget.on_connectors_loaded(result);
AppEvent::ConnectorsLoaded { result, is_final } => {
self.chat_widget.on_connectors_loaded(result, is_final);
}
AppEvent::UpdateReasoningEffort(effort) => {
self.on_update_reasoning_effort(effort);

View file

@ -97,7 +97,10 @@ pub(crate) enum AppEvent {
RateLimitSnapshotFetched(RateLimitSnapshot),
/// Result of prefetching connectors.
ConnectorsLoaded(Result<ConnectorsSnapshot, String>),
ConnectorsLoaded {
result: Result<ConnectorsSnapshot, String>,
is_final: bool,
},
/// Result of computing a `/diff` command.
DiffResult(String),

View file

@ -16,6 +16,11 @@ pub(crate) trait BottomPaneView: Renderable {
false
}
/// Stable identifier for views that need external refreshes while open.
fn view_id(&self) -> Option<&'static str> {
None
}
/// Handle Ctrl-C while this view is active.
fn on_ctrl_c(&mut self) -> CancellationEvent {
CancellationEvent::NotHandled

View file

@ -421,6 +421,7 @@ impl ChatComposer {
pub fn set_connector_mentions(&mut self, connectors_snapshot: Option<ConnectorsSnapshot>) {
self.connectors_snapshot = connectors_snapshot;
self.sync_popups();
}
pub(crate) fn take_mention_bindings(&mut self) -> Vec<MentionBinding> {
@ -4269,6 +4270,43 @@ mod tests {
assert_ne!(composer.footer_mode, FooterMode::ShortcutOverlay);
}
#[test]
fn set_connector_mentions_refreshes_open_mention_popup() {
let (tx, _rx) = unbounded_channel::<AppEvent>();
let sender = AppEventSender::new(tx);
let mut composer = ChatComposer::new(
true,
sender,
false,
"Ask Codex to do anything".to_string(),
false,
);
composer.set_connectors_enabled(true);
composer.set_text_content("$".to_string(), Vec::new(), Vec::new());
assert!(matches!(composer.active_popup, ActivePopup::None));
let connectors = vec![AppInfo {
id: "connector_1".to_string(),
name: "Notion".to_string(),
description: Some("Workspace docs".to_string()),
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
install_url: Some("https://example.test/notion".to_string()),
is_accessible: true,
}];
composer.set_connector_mentions(Some(ConnectorsSnapshot { connectors }));
let ActivePopup::Skill(popup) = &composer.active_popup else {
panic!("expected mention popup to open after connectors update");
};
let mention = popup
.selected_mention()
.expect("expected connector mention to be selected");
assert_eq!(mention.insert_text, "$notion".to_string());
assert_eq!(mention.path, Some("app://connector_1".to_string()));
}
#[test]
fn shortcut_overlay_persists_while_task_running() {
use crossterm::event::KeyCode;

View file

@ -68,6 +68,7 @@ pub(crate) struct SelectionItem {
/// `AutoAllRows` measures all rows to ensure stable column widths as the user scrolls
/// `Fixed` used a fixed 30/70 split between columns
pub(crate) struct SelectionViewParams {
pub view_id: Option<&'static str>,
pub title: Option<String>,
pub subtitle: Option<String>,
pub footer_note: Option<Line<'static>>,
@ -83,6 +84,7 @@ pub(crate) struct SelectionViewParams {
impl Default for SelectionViewParams {
fn default() -> Self {
Self {
view_id: None,
title: None,
subtitle: None,
footer_note: None,
@ -103,6 +105,7 @@ impl Default for SelectionViewParams {
/// visible rows and source items and for preserving selection while filters
/// change.
pub(crate) struct ListSelectionView {
view_id: Option<&'static str>,
footer_note: Option<Line<'static>>,
footer_hint: Option<Line<'static>>,
items: Vec<SelectionItem>,
@ -139,6 +142,7 @@ impl ListSelectionView {
]));
}
let mut s = Self {
view_id: params.view_id,
footer_note: params.footer_note,
footer_hint: params.footer_hint,
items: params.items,
@ -460,6 +464,10 @@ impl BottomPaneView for ListSelectionView {
self.complete
}
fn view_id(&self) -> Option<&'static str> {
self.view_id
}
fn on_ctrl_c(&mut self) -> CancellationEvent {
self.complete = true;
CancellationEvent::Handled

View file

@ -658,6 +658,26 @@ impl BottomPane {
self.push_view(Box::new(view));
}
/// Replace the active selection view when it matches `view_id`.
pub(crate) fn replace_selection_view_if_active(
&mut self,
view_id: &'static str,
params: list_selection_view::SelectionViewParams,
) -> bool {
let is_match = self
.view_stack
.last()
.is_some_and(|view| view.view_id() == Some(view_id));
if !is_match {
return false;
}
self.view_stack.pop();
let view = list_selection_view::ListSelectionView::new(params, self.app_event_tx.clone());
self.push_view(Box::new(view));
true
}
/// Update the queued messages preview shown above the composer.
pub(crate) fn set_queued_user_messages(&mut self, queued: Vec<String>) {
self.queued_user_messages.messages = queued;

View file

@ -145,12 +145,14 @@ use ratatui::widgets::Wrap;
use tokio::sync::mpsc::UnboundedSender;
use tokio::task::JoinHandle;
use tracing::debug;
use tracing::warn;
const DEFAULT_MODEL_DISPLAY_NAME: &str = "loading";
const PLAN_IMPLEMENTATION_TITLE: &str = "Implement this plan?";
const PLAN_IMPLEMENTATION_YES: &str = "Yes, implement this plan";
const PLAN_IMPLEMENTATION_NO: &str = "No, stay in Plan mode";
const PLAN_IMPLEMENTATION_CODING_MESSAGE: &str = "Implement the plan.";
const CONNECTORS_SELECTION_VIEW_ID: &str = "connectors-selection";
use crate::app_event::AppEvent;
use crate::app_event::ConnectorsSnapshot;
@ -539,6 +541,7 @@ pub(crate) struct ChatWidget {
/// currently executing.
mcp_startup_status: Option<HashMap<String, McpStartupStatus>>,
connectors_cache: ConnectorsCacheState,
connectors_prefetch_in_flight: bool,
// Queue of interruptive UI events deferred during an active write cycle
interrupts: InterruptManager,
// Accumulates the current reasoning block text to extract a header
@ -1013,7 +1016,6 @@ impl ChatWidget {
self.bottom_pane
.set_history_metadata(event.history_log_id, event.history_entry_count);
self.set_skills(None);
self.bottom_pane.set_connectors_snapshot(None);
self.thread_id = Some(event.session_id);
self.thread_name = event.thread_name.clone();
self.forked_from = event.forked_from_id;
@ -2619,6 +2621,7 @@ impl ChatWidget {
agent_turn_running: false,
mcp_startup_status: None,
connectors_cache: ConnectorsCacheState::default(),
connectors_prefetch_in_flight: false,
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
full_reasoning_buffer: String::new(),
@ -2782,6 +2785,7 @@ impl ChatWidget {
agent_turn_running: false,
mcp_startup_status: None,
connectors_cache: ConnectorsCacheState::default(),
connectors_prefetch_in_flight: false,
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
full_reasoning_buffer: String::new(),
@ -2934,6 +2938,7 @@ impl ChatWidget {
agent_turn_running: false,
mcp_startup_status: None,
connectors_cache: ConnectorsCacheState::default(),
connectors_prefetch_in_flight: false,
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
full_reasoning_buffer: String::new(),
@ -4470,24 +4475,52 @@ impl ChatWidget {
}
fn prefetch_connectors(&mut self) {
if !self.connectors_enabled() {
return;
}
if matches!(self.connectors_cache, ConnectorsCacheState::Loading) {
if !self.connectors_enabled() || self.connectors_prefetch_in_flight {
return;
}
self.connectors_cache = ConnectorsCacheState::Loading;
self.connectors_prefetch_in_flight = true;
if !matches!(self.connectors_cache, ConnectorsCacheState::Ready(_)) {
self.connectors_cache = ConnectorsCacheState::Loading;
}
let config = self.config.clone();
let app_event_tx = self.app_event_tx.clone();
tokio::spawn(async move {
let result: Result<ConnectorsSnapshot, anyhow::Error> = async {
let connectors = connectors::list_connectors(&config).await?;
let accessible_connectors =
match connectors::list_accessible_connectors_from_mcp_tools(&config).await {
Ok(connectors) => connectors,
Err(err) => {
app_event_tx.send(AppEvent::ConnectorsLoaded {
result: Err(format!("Failed to load apps: {err}")),
is_final: true,
});
return;
}
};
app_event_tx.send(AppEvent::ConnectorsLoaded {
result: Ok(ConnectorsSnapshot {
connectors: accessible_connectors.clone(),
}),
is_final: false,
});
let result: Result<ConnectorsSnapshot, String> = async {
let all_connectors = connectors::list_all_connectors(&config).await?;
let connectors = connectors::merge_connectors_with_accessible(
all_connectors,
accessible_connectors,
);
Ok(ConnectorsSnapshot { connectors })
}
.await;
let result = result.map_err(|err| format!("Failed to load apps: {err}"));
app_event_tx.send(AppEvent::ConnectorsLoaded(result));
.await
.map_err(|err: anyhow::Error| format!("Failed to load apps: {err}"));
app_event_tx.send(AppEvent::ConnectorsLoaded {
result,
is_final: true,
});
});
}
@ -6345,6 +6378,11 @@ impl ChatWidget {
}
fn open_connectors_popup(&mut self, connectors: &[connectors::AppInfo]) {
self.bottom_pane
.show_selection_view(self.connectors_popup_params(connectors));
}
fn connectors_popup_params(&self, connectors: &[connectors::AppInfo]) -> SelectionViewParams {
let total = connectors.len();
let installed = connectors
.iter()
@ -6412,7 +6450,8 @@ impl ChatWidget {
items.push(item);
}
self.bottom_pane.show_selection_view(SelectionViewParams {
SelectionViewParams {
view_id: Some(CONNECTORS_SELECTION_VIEW_ID),
header: Box::new(header),
footer_hint: Some(Self::connectors_popup_hint_line()),
items,
@ -6420,7 +6459,14 @@ impl ChatWidget {
search_placeholder: Some("Type to search apps".to_string()),
col_width_mode: ColumnWidthMode::AutoAllRows,
..Default::default()
});
}
}
fn refresh_connectors_popup_if_open(&mut self, connectors: &[connectors::AppInfo]) {
let _ = self.bottom_pane.replace_selection_view_if_active(
CONNECTORS_SELECTION_VIEW_ID,
self.connectors_popup_params(connectors),
);
}
fn connectors_popup_hint_line() -> Line<'static> {
@ -6659,16 +6705,30 @@ impl ChatWidget {
self.set_skills_from_response(&ev);
}
pub(crate) fn on_connectors_loaded(&mut self, result: Result<ConnectorsSnapshot, String>) {
self.connectors_cache = match result {
Ok(connectors) => ConnectorsCacheState::Ready(connectors),
Err(err) => ConnectorsCacheState::Failed(err),
};
if let ConnectorsCacheState::Ready(snapshot) = &self.connectors_cache {
self.bottom_pane
.set_connectors_snapshot(Some(snapshot.clone()));
} else {
self.bottom_pane.set_connectors_snapshot(None);
pub(crate) fn on_connectors_loaded(
&mut self,
result: Result<ConnectorsSnapshot, String>,
is_final: bool,
) {
if is_final {
self.connectors_prefetch_in_flight = false;
}
match result {
Ok(snapshot) => {
self.refresh_connectors_popup_if_open(&snapshot.connectors);
self.connectors_cache = ConnectorsCacheState::Ready(snapshot.clone());
self.bottom_pane.set_connectors_snapshot(Some(snapshot));
}
Err(err) => {
if matches!(self.connectors_cache, ConnectorsCacheState::Ready(_)) {
warn!("failed to refresh apps list; retaining current apps snapshot: {err}");
return;
}
self.connectors_cache = ConnectorsCacheState::Failed(err);
self.bottom_pane.set_connectors_snapshot(None);
}
}
}

View file

@ -1074,6 +1074,7 @@ async fn make_chatwidget_manual(
agent_turn_running: false,
mcp_startup_status: None,
connectors_cache: ConnectorsCacheState::default(),
connectors_prefetch_in_flight: false,
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
full_reasoning_buffer: String::new(),
@ -3667,6 +3668,74 @@ fn render_bottom_popup(chat: &ChatWidget, width: u16) -> String {
lines.join("\n")
}
#[tokio::test]
async fn apps_popup_refreshes_when_connectors_snapshot_updates() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.config.features.enable(Feature::Apps);
chat.bottom_pane.set_connectors_enabled(true);
chat.on_connectors_loaded(
Ok(ConnectorsSnapshot {
connectors: vec![codex_chatgpt::connectors::AppInfo {
id: "connector_1".to_string(),
name: "Notion".to_string(),
description: Some("Workspace docs".to_string()),
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
install_url: Some("https://example.test/notion".to_string()),
is_accessible: true,
}],
}),
false,
);
chat.add_connectors_output();
let before = render_bottom_popup(&chat, 80);
assert!(
before.contains("Installed 1 of 1 available apps."),
"expected initial apps popup snapshot, got:\n{before}"
);
chat.on_connectors_loaded(
Ok(ConnectorsSnapshot {
connectors: vec![
codex_chatgpt::connectors::AppInfo {
id: "connector_1".to_string(),
name: "Notion".to_string(),
description: Some("Workspace docs".to_string()),
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
install_url: Some("https://example.test/notion".to_string()),
is_accessible: true,
},
codex_chatgpt::connectors::AppInfo {
id: "connector_2".to_string(),
name: "Linear".to_string(),
description: Some("Project tracking".to_string()),
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
install_url: Some("https://example.test/linear".to_string()),
is_accessible: true,
},
],
}),
true,
);
let after = render_bottom_popup(&chat, 80);
assert!(
after.contains("Installed 2 of 2 available apps."),
"expected refreshed apps popup snapshot, got:\n{after}"
);
assert!(
after.contains("Linear"),
"expected refreshed popup to include new connector, got:\n{after}"
);
}
#[tokio::test]
async fn experimental_features_popup_snapshot() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;