From 18bd6d2d71f0055d08ee9856de469fcfd28899cf Mon Sep 17 00:00:00 2001 From: Matthew Zeng Date: Thu, 19 Feb 2026 22:06:51 -0800 Subject: [PATCH] [apps] Store apps tool cache in disk to reduce startup time. (#11822) We now write MCP tools from installed apps to disk cache so that they can be picked up instantly at startup. We still do a fresh fetch from remote MCP server but it's non blocking unless there's a cache miss. - [x] Store apps tool cache in disk to reduce startup time. --- codex-rs/chatgpt/src/connectors.rs | 64 +- codex-rs/core/src/codex.rs | 5 + codex-rs/core/src/connectors.rs | 194 ++++- codex-rs/core/src/mcp/mod.rs | 3 + codex-rs/core/src/mcp_connection_manager.rs | 745 +++++++++++++++----- codex-rs/tui/src/chatwidget.rs | 44 +- codex-rs/tui/src/chatwidget/tests.rs | 37 + 7 files changed, 848 insertions(+), 244 deletions(-) diff --git a/codex-rs/chatgpt/src/connectors.rs b/codex-rs/chatgpt/src/connectors.rs index 6dc2161f0..27f2f1de9 100644 --- a/codex-rs/chatgpt/src/connectors.rs +++ b/codex-rs/chatgpt/src/connectors.rs @@ -4,8 +4,6 @@ use std::sync::LazyLock; use std::sync::Mutex as StdMutex; use codex_core::config::Config; -use codex_core::default_client::is_first_party_chat_originator; -use codex_core::default_client::originator; use codex_core::features::Feature; use codex_core::token_data::TokenData; use serde::Deserialize; @@ -22,8 +20,10 @@ use codex_core::connectors::AppMetadata; use codex_core::connectors::CONNECTORS_CACHE_TTL; pub use codex_core::connectors::connector_display_label; use codex_core::connectors::connector_install_url; +use codex_core::connectors::filter_disallowed_connectors; pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools; pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools_with_options; +pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools_with_options_and_status; pub use codex_core::connectors::list_cached_accessible_connectors_from_mcp_tools; use codex_core::connectors::merge_connectors; pub use codex_core::connectors::with_app_enabled_state; @@ -106,7 +106,7 @@ pub async fn list_cached_all_connectors(config: &Config) -> Option> } let token_data = get_chatgpt_token_data()?; let cache_key = all_connectors_cache_key(config, &token_data); - read_cached_all_connectors(&cache_key) + read_cached_all_connectors(&cache_key).map(filter_disallowed_connectors) } pub async fn list_all_connectors_with_options( @@ -123,7 +123,7 @@ pub async fn list_all_connectors_with_options( get_chatgpt_token_data().ok_or_else(|| anyhow::anyhow!("ChatGPT token not available"))?; let cache_key = all_connectors_cache_key(config, &token_data); if !force_refetch && let Some(cached_connectors) = read_cached_all_connectors(&cache_key) { - return Ok(cached_connectors); + return Ok(filter_disallowed_connectors(cached_connectors)); } let mut apps = list_directory_connectors(config).await?; @@ -149,6 +149,7 @@ pub async fn list_all_connectors_with_options( .cmp(&right.name) .then_with(|| left.id.cmp(&right.id)) }); + let connectors = filter_disallowed_connectors(connectors); write_cached_all_connectors(cache_key, &connectors); Ok(connectors) } @@ -453,45 +454,6 @@ fn normalize_connector_value(value: Option<&str>) -> Option { .filter(|value| !value.is_empty()) .map(str::to_string) } - -const DISALLOWED_CONNECTOR_IDS: &[&str] = &[ - "asdk_app_6938a94a61d881918ef32cb999ff937c", - "connector_2b0a9009c9c64bf9933a3dae3f2b1254", - "connector_68de829bf7648191acd70a907364c67c", - "connector_68e004f14af881919eb50893d3d9f523", - "connector_69272cb413a081919685ec3c88d1744e", - "connector_0f9c9d4592e54d0a9a12b3f44a1e2010", -]; -const FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS: &[&str] = - &["connector_0f9c9d4592e54d0a9a12b3f44a1e2010"]; -const DISALLOWED_CONNECTOR_PREFIX: &str = "connector_openai_"; - -fn filter_disallowed_connectors(connectors: Vec) -> Vec { - filter_disallowed_connectors_for_originator(connectors, originator().value.as_str()) -} - -fn filter_disallowed_connectors_for_originator( - connectors: Vec, - originator_value: &str, -) -> Vec { - let disallowed_connector_ids = if is_first_party_chat_originator(originator_value) { - FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS - } else { - DISALLOWED_CONNECTOR_IDS - }; - - connectors - .into_iter() - .filter(|connector| is_connector_allowed(connector, disallowed_connector_ids)) - .collect() -} - -fn is_connector_allowed(connector: &AppInfo, disallowed_connector_ids: &[&str]) -> bool { - let connector_id = connector.id.as_str(); - !connector_id.starts_with(DISALLOWED_CONNECTOR_PREFIX) - && !disallowed_connector_ids.contains(&connector_id) -} - #[cfg(test)] mod tests { use super::*; @@ -554,22 +516,6 @@ mod tests { assert_eq!(filtered, vec![app("delta")]); } - #[test] - fn first_party_chat_originator_filters_target_and_openai_prefixed_connectors() { - let filtered = filter_disallowed_connectors_for_originator( - vec![ - app("connector_openai_foo"), - app("asdk_app_6938a94a61d881918ef32cb999ff937c"), - app("connector_0f9c9d4592e54d0a9a12b3f44a1e2010"), - ], - "codex_atlas", - ); - assert_eq!( - filtered, - vec![app("asdk_app_6938a94a61d881918ef32cb999ff937c"),] - ); - } - fn merged_app(id: &str, is_accessible: bool) -> AppInfo { AppInfo { id: id.to_string(), diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index d9ba1ca3f..a384ffc00 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -149,6 +149,7 @@ use crate::mcp::effective_mcp_servers; use crate::mcp::maybe_prompt_and_install_mcp_dependencies; use crate::mcp::with_codex_apps_mcp; use crate::mcp_connection_manager::McpConnectionManager; +use crate::mcp_connection_manager::codex_apps_tools_cache_key; use crate::mcp_connection_manager::filter_codex_apps_mcp_tools_only; use crate::mcp_connection_manager::filter_mcp_tools_by_name; use crate::mcp_connection_manager::filter_non_codex_apps_mcp_tools_only; @@ -1420,6 +1421,8 @@ impl Session { &session_configuration.approval_policy, tx_event.clone(), sandbox_state, + config.codex_home.clone(), + codex_apps_tools_cache_key(auth), ) .await; { @@ -3068,6 +3071,8 @@ impl Session { &turn_context.config.permissions.approval_policy, self.get_tx_event(), sandbox_state, + config.codex_home.clone(), + codex_apps_tools_cache_key(auth.as_ref()), ) .await; { diff --git a/codex-rs/core/src/connectors.rs b/codex-rs/core/src/connectors.rs index 884e32dd1..a34477850 100644 --- a/codex-rs/core/src/connectors.rs +++ b/codex-rs/core/src/connectors.rs @@ -19,15 +19,18 @@ use crate::CodexAuth; use crate::SandboxState; use crate::config::Config; use crate::config::types::AppsConfigToml; +use crate::default_client::is_first_party_chat_originator; +use crate::default_client::originator; use crate::features::Feature; use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; use crate::mcp::auth::compute_auth_statuses; use crate::mcp::with_codex_apps_mcp; -use crate::mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT; use crate::mcp_connection_manager::McpConnectionManager; +use crate::mcp_connection_manager::codex_apps_tools_cache_key; use crate::token_data::TokenData; pub const CONNECTORS_CACHE_TTL: Duration = Duration::from_secs(3600); +const CONNECTORS_READY_TIMEOUT_ON_EMPTY_TOOLS: Duration = Duration::from_secs(30); #[derive(Clone, PartialEq, Eq)] struct AccessibleConnectorsCacheKey { @@ -47,10 +50,20 @@ struct CachedAccessibleConnectors { static ACCESSIBLE_CONNECTORS_CACHE: LazyLock>> = LazyLock::new(|| StdMutex::new(None)); +#[derive(Debug, Clone)] +pub struct AccessibleConnectorsStatus { + pub connectors: Vec, + pub codex_apps_ready: bool, +} + pub async fn list_accessible_connectors_from_mcp_tools( config: &Config, ) -> anyhow::Result> { - list_accessible_connectors_from_mcp_tools_with_options(config, false).await + Ok( + list_accessible_connectors_from_mcp_tools_with_options_and_status(config, false) + .await? + .connectors, + ) } pub async fn list_cached_accessible_connectors_from_mcp_tools( @@ -63,15 +76,29 @@ pub async fn list_cached_accessible_connectors_from_mcp_tools( let auth_manager = auth_manager_from_config(config); let auth = auth_manager.auth().await; let cache_key = accessible_connectors_cache_key(config, auth.as_ref()); - read_cached_accessible_connectors(&cache_key) + read_cached_accessible_connectors(&cache_key).map(filter_disallowed_connectors) } pub async fn list_accessible_connectors_from_mcp_tools_with_options( config: &Config, force_refetch: bool, ) -> anyhow::Result> { + Ok( + list_accessible_connectors_from_mcp_tools_with_options_and_status(config, force_refetch) + .await? + .connectors, + ) +} + +pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status( + config: &Config, + force_refetch: bool, +) -> anyhow::Result { if !config.features.enabled(Feature::Apps) { - return Ok(Vec::new()); + return Ok(AccessibleConnectorsStatus { + connectors: Vec::new(), + codex_apps_ready: true, + }); } let auth_manager = auth_manager_from_config(config); @@ -79,12 +106,19 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options( let cache_key = accessible_connectors_cache_key(config, auth.as_ref()); if !force_refetch && let Some(cached_connectors) = read_cached_accessible_connectors(&cache_key) { - return Ok(cached_connectors); + let cached_connectors = filter_disallowed_connectors(cached_connectors); + return Ok(AccessibleConnectorsStatus { + connectors: cached_connectors, + codex_apps_ready: true, + }); } let mcp_servers = with_codex_apps_mcp(HashMap::new(), true, auth.as_ref(), config); if mcp_servers.is_empty() { - return Ok(Vec::new()); + return Ok(AccessibleConnectorsStatus { + connectors: Vec::new(), + codex_apps_ready: true, + }); } let auth_status_entries = @@ -107,6 +141,8 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options( &config.permissions.approval_policy, tx_event, sandbox_state, + config.codex_home.clone(), + codex_apps_tools_cache_key(auth.as_ref()), ) .await; @@ -120,23 +156,45 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options( ); } + let mut tools = mcp_connection_manager.list_all_tools().await; + let mut should_reload_tools = false; let codex_apps_ready = if let Some(cfg) = mcp_servers.get(CODEX_APPS_MCP_SERVER_NAME) { - let timeout = cfg.startup_timeout_sec.unwrap_or(DEFAULT_STARTUP_TIMEOUT); - mcp_connection_manager - .wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, timeout) - .await + let immediate_ready = mcp_connection_manager + .wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, Duration::ZERO) + .await; + if immediate_ready { + true + } else if tools.is_empty() { + let timeout = cfg + .startup_timeout_sec + .unwrap_or(CONNECTORS_READY_TIMEOUT_ON_EMPTY_TOOLS); + let ready = mcp_connection_manager + .wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, timeout) + .await; + should_reload_tools = ready; + ready + } else { + false + } } else { false }; + if should_reload_tools { + tools = mcp_connection_manager.list_all_tools().await; + } + if codex_apps_ready { + cancel_token.cancel(); + } - let tools = mcp_connection_manager.list_all_tools().await; - cancel_token.cancel(); - - let accessible_connectors = accessible_connectors_from_mcp_tools(&tools); + let accessible_connectors = + filter_disallowed_connectors(accessible_connectors_from_mcp_tools(&tools)); if codex_apps_ready || !accessible_connectors.is_empty() { write_cached_accessible_connectors(cache_key, &accessible_connectors); } - Ok(accessible_connectors) + Ok(AccessibleConnectorsStatus { + connectors: accessible_connectors, + codex_apps_ready, + }) } fn accessible_connectors_cache_key( @@ -288,6 +346,48 @@ pub fn with_app_enabled_state(mut connectors: Vec, config: &Config) -> connectors } +const DISALLOWED_CONNECTOR_IDS: &[&str] = &[ + "asdk_app_6938a94a61d881918ef32cb999ff937c", + "connector_2b0a9009c9c64bf9933a3dae3f2b1254", + "connector_68de829bf7648191acd70a907364c67c", + "connector_68e004f14af881919eb50893d3d9f523", + "connector_69272cb413a081919685ec3c88d1744e", +]; +const FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS: &[&str] = + &["connector_0f9c9d4592e54d0a9a12b3f44a1e2010"]; +const DISALLOWED_CONNECTOR_PREFIX: &str = "connector_openai_"; + +pub fn filter_disallowed_connectors(connectors: Vec) -> Vec { + filter_disallowed_connectors_for_originator(connectors, originator().value.as_str()) +} + +pub(crate) fn is_connector_id_allowed(connector_id: &str) -> bool { + is_connector_id_allowed_for_originator(connector_id, originator().value.as_str()) +} + +fn filter_disallowed_connectors_for_originator( + connectors: Vec, + originator_value: &str, +) -> Vec { + connectors + .into_iter() + .filter(|connector| { + is_connector_id_allowed_for_originator(connector.id.as_str(), originator_value) + }) + .collect() +} + +fn is_connector_id_allowed_for_originator(connector_id: &str, originator_value: &str) -> bool { + let disallowed_connector_ids = if is_first_party_chat_originator(originator_value) { + FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS + } else { + DISALLOWED_CONNECTOR_IDS + }; + + !connector_id.starts_with(DISALLOWED_CONNECTOR_PREFIX) + && !disallowed_connector_ids.contains(&connector_id) +} + fn read_apps_config(config: &Config) -> Option { let effective_config = config.config_layer_stack.effective_config(); let apps_config = effective_config.as_table()?.get("apps")?.clone(); @@ -368,3 +468,67 @@ pub fn connector_name_slug(name: &str) -> String { fn format_connector_label(name: &str, _id: &str) -> String { name.to_string() } + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + fn app(id: &str) -> AppInfo { + AppInfo { + id: id.to_string(), + name: id.to_string(), + description: None, + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: None, + branding: None, + app_metadata: None, + labels: None, + is_accessible: false, + is_enabled: true, + } + } + + #[test] + fn filter_disallowed_connectors_allows_non_disallowed_connectors() { + let filtered = filter_disallowed_connectors(vec![app("asdk_app_hidden"), app("alpha")]); + assert_eq!(filtered, vec![app("asdk_app_hidden"), app("alpha")]); + } + + #[test] + fn filter_disallowed_connectors_filters_openai_prefix() { + let filtered = filter_disallowed_connectors(vec![ + app("connector_openai_foo"), + app("connector_openai_bar"), + app("gamma"), + ]); + assert_eq!(filtered, vec![app("gamma")]); + } + + #[test] + fn filter_disallowed_connectors_filters_disallowed_connector_ids() { + let filtered = filter_disallowed_connectors(vec![ + app("asdk_app_6938a94a61d881918ef32cb999ff937c"), + app("delta"), + ]); + assert_eq!(filtered, vec![app("delta")]); + } + + #[test] + fn first_party_chat_originator_filters_target_and_openai_prefixed_connectors() { + let filtered = filter_disallowed_connectors_for_originator( + vec![ + app("connector_openai_foo"), + app("asdk_app_6938a94a61d881918ef32cb999ff937c"), + app("connector_0f9c9d4592e54d0a9a12b3f44a1e2010"), + ], + "codex_atlas", + ); + assert_eq!( + filtered, + vec![app("asdk_app_6938a94a61d881918ef32cb999ff937c"),] + ); + } +} diff --git a/codex-rs/core/src/mcp/mod.rs b/codex-rs/core/src/mcp/mod.rs index 681b1674b..f2c255751 100644 --- a/codex-rs/core/src/mcp/mod.rs +++ b/codex-rs/core/src/mcp/mod.rs @@ -24,6 +24,7 @@ use crate::features::Feature; use crate::mcp::auth::compute_auth_statuses; use crate::mcp_connection_manager::McpConnectionManager; use crate::mcp_connection_manager::SandboxState; +use crate::mcp_connection_manager::codex_apps_tools_cache_key; const MCP_TOOL_NAME_PREFIX: &str = "mcp"; const MCP_TOOL_NAME_DELIMITER: &str = "__"; @@ -208,6 +209,8 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent &config.permissions.approval_policy, tx_event, sandbox_state, + config.codex_home.clone(), + codex_apps_tools_cache_key(auth.as_ref()), ) .await; diff --git a/codex-rs/core/src/mcp_connection_manager.rs b/codex-rs/core/src/mcp_connection_manager.rs index c872a9732..c76e19809 100644 --- a/codex-rs/core/src/mcp_connection_manager.rs +++ b/codex-rs/core/src/mcp_connection_manager.rs @@ -12,8 +12,9 @@ use std::env; use std::ffi::OsString; use std::path::PathBuf; use std::sync::Arc; -use std::sync::LazyLock; use std::sync::Mutex as StdMutex; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; use std::time::Duration; use std::time::Instant; @@ -76,6 +77,7 @@ use tracing::warn; use crate::codex::INITIAL_SUBMIT_ID; use crate::config::types::McpServerConfig; use crate::config::types::McpServerTransportConfig; +use crate::connectors::is_connector_id_allowed; /// Delimiter used to separate the server name from the tool name in a fully /// qualified tool name. @@ -91,7 +93,8 @@ pub const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10); /// Default timeout for individual tool calls. const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(60); -const CODEX_APPS_TOOLS_CACHE_TTL: Duration = Duration::from_secs(3600); +const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 1; +const CODEX_APPS_TOOLS_CACHE_DIR: &str = "cache/codex_apps_tools"; const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms"; const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str = "codex.mcp.tools.fetch_uncached.duration_ms"; const MCP_TOOLS_CACHE_WRITE_DURATION_METRIC: &str = "codex.mcp.tools.cache_write.duration_ms"; @@ -123,6 +126,27 @@ fn sha1_hex(s: &str) -> String { format!("{sha1:x}") } +pub(crate) fn codex_apps_tools_cache_key( + auth: Option<&crate::CodexAuth>, +) -> CodexAppsToolsCacheKey { + let token_data = auth.and_then(|auth| auth.get_token_data().ok()); + let account_id = token_data + .as_ref() + .and_then(|token_data| token_data.account_id.clone()); + let chatgpt_user_id = token_data + .as_ref() + .and_then(|token_data| token_data.id_token.chatgpt_user_id.clone()); + let is_workspace_account = token_data + .as_ref() + .is_some_and(|token_data| token_data.id_token.is_workspace_account()); + + CodexAppsToolsCacheKey { + account_id, + chatgpt_user_id, + is_workspace_account, + } +} + fn qualify_tools(tools: I) -> HashMap where I: IntoIterator, @@ -165,7 +189,7 @@ where qualified_tools } -#[derive(Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub(crate) struct ToolInfo { pub(crate) server_name: String, pub(crate) tool_name: String, @@ -174,14 +198,40 @@ pub(crate) struct ToolInfo { pub(crate) connector_name: Option, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct CodexAppsToolsCacheKey { + account_id: Option, + chatgpt_user_id: Option, + is_workspace_account: bool, +} + #[derive(Clone)] -struct CachedCodexAppsTools { - expires_at: Instant, +struct CodexAppsToolsCacheContext { + codex_home: PathBuf, + user_key: CodexAppsToolsCacheKey, +} + +impl CodexAppsToolsCacheContext { + fn cache_path(&self) -> PathBuf { + let user_key_json = serde_json::to_string(&self.user_key).unwrap_or_default(); + let user_key_hash = sha1_hex(&user_key_json); + self.codex_home + .join(CODEX_APPS_TOOLS_CACHE_DIR) + .join(format!("{user_key_hash}.json")) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct CodexAppsToolsDiskCache { + schema_version: u8, tools: Vec, } -static CODEX_APPS_TOOLS_CACHE: LazyLock>> = - LazyLock::new(|| StdMutex::new(None)); +enum CachedCodexAppsToolsLoad { + Hit(Vec), + Missing, + Invalid, +} type ResponderMap = HashMap<(String, RequestId), oneshot::Sender>; @@ -289,9 +339,35 @@ struct ManagedClient { tool_filter: ToolFilter, tool_timeout: Option, server_supports_sandbox_state_capability: bool, + codex_apps_tools_cache_context: Option, } impl ManagedClient { + fn listed_tools(&self) -> Vec { + let total_start = Instant::now(); + if let Some(cache_context) = self.codex_apps_tools_cache_context.as_ref() + && let CachedCodexAppsToolsLoad::Hit(tools) = + load_cached_codex_apps_tools(cache_context) + { + emit_duration( + MCP_TOOLS_LIST_DURATION_METRIC, + total_start.elapsed(), + &[("cache", "hit")], + ); + return filter_tools(tools, &self.tool_filter); + } + + if self.codex_apps_tools_cache_context.is_some() { + emit_duration( + MCP_TOOLS_LIST_DURATION_METRIC, + total_start.elapsed(), + &[("cache", "miss")], + ); + } + + self.tools.clone() + } + /// Returns once the server has ack'd the sandbox state update. async fn notify_sandbox_state_change(&self, sandbox_state: &SandboxState) -> Result<()> { if !self.server_supports_sandbox_state_capability { @@ -312,6 +388,8 @@ impl ManagedClient { #[derive(Clone)] struct AsyncManagedClient { client: Shared>>, + startup_snapshot: Option>, + startup_complete: Arc, } impl AsyncManagedClient { @@ -322,33 +400,63 @@ impl AsyncManagedClient { cancel_token: CancellationToken, tx_event: Sender, elicitation_requests: ElicitationRequestManager, + codex_apps_tools_cache_context: Option, ) -> Self { let tool_filter = ToolFilter::from_config(&config); + let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot( + &server_name, + codex_apps_tools_cache_context.as_ref(), + ) + .map(|tools| filter_tools(tools, &tool_filter)); + let startup_tool_filter = tool_filter; + let startup_complete = Arc::new(AtomicBool::new(false)); + let startup_complete_for_fut = Arc::clone(&startup_complete); let fut = async move { - if let Err(error) = validate_mcp_server_name(&server_name) { - return Err(error.into()); - } + let outcome = async { + if let Err(error) = validate_mcp_server_name(&server_name) { + return Err(error.into()); + } - let client = - Arc::new(make_rmcp_client(&server_name, config.transport, store_mode).await?); - match start_server_task( - server_name, - client, - config.startup_timeout_sec.or(Some(DEFAULT_STARTUP_TIMEOUT)), - config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT), - tool_filter, - tx_event, - elicitation_requests, - ) - .or_cancel(&cancel_token) - .await - { - Ok(result) => result, - Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled), + let client = + Arc::new(make_rmcp_client(&server_name, config.transport, store_mode).await?); + match start_server_task( + server_name, + client, + StartServerTaskParams { + startup_timeout: config + .startup_timeout_sec + .or(Some(DEFAULT_STARTUP_TIMEOUT)), + tool_timeout: config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT), + tool_filter: startup_tool_filter, + tx_event, + elicitation_requests, + codex_apps_tools_cache_context, + }, + ) + .or_cancel(&cancel_token) + .await + { + Ok(result) => result, + Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled), + } } + .await; + + startup_complete_for_fut.store(true, Ordering::Release); + outcome }; + let client = fut.boxed().shared(); + if startup_snapshot.is_some() { + let startup_task = client.clone(); + tokio::spawn(async move { + let _ = startup_task.await; + }); + } + Self { - client: fut.boxed().shared(), + client, + startup_snapshot, + startup_complete, } } @@ -356,6 +464,24 @@ impl AsyncManagedClient { self.client.clone().await } + fn startup_snapshot_while_initializing(&self) -> Option> { + if !self.startup_complete.load(Ordering::Acquire) { + return self.startup_snapshot.clone(); + } + None + } + + async fn listed_tools(&self) -> Option> { + if let Some(startup_tools) = self.startup_snapshot_while_initializing() { + return Some(startup_tools); + } + + match self.client().await { + Ok(client) => Some(client.listed_tools()), + Err(_) => self.startup_snapshot.clone(), + } + } + async fn notify_sandbox_state_change(&self, sandbox_state: &SandboxState) -> Result<()> { let managed = self.client().await?; managed.notify_sandbox_state_change(sandbox_state).await @@ -417,6 +543,8 @@ impl McpConnectionManager { approval_policy: &Constrained, tx_event: Sender, initial_sandbox_state: SandboxState, + codex_home: PathBuf, + codex_apps_tools_cache_key: CodexAppsToolsCacheKey, ) -> (Self, CancellationToken) { let cancel_token = CancellationToken::new(); let mut clients = HashMap::new(); @@ -433,6 +561,14 @@ impl McpConnectionManager { }, ) .await; + let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME { + Some(CodexAppsToolsCacheContext { + codex_home: codex_home.clone(), + user_key: codex_apps_tools_cache_key.clone(), + }) + } else { + None + }; let async_managed_client = AsyncManagedClient::new( server_name.clone(), cfg, @@ -440,6 +576,7 @@ impl McpConnectionManager { cancel_token.clone(), tx_event.clone(), elicitation_requests.clone(), + codex_apps_tools_cache_context, ); clients.insert(server_name.clone(), async_managed_client.clone()); let tx_event = tx_event.clone(); @@ -575,29 +712,11 @@ impl McpConnectionManager { #[instrument(level = "trace", skip_all)] pub async fn list_all_tools(&self) -> HashMap { let mut tools = HashMap::new(); - for (server_name, managed_client) in &self.clients { - let client = managed_client.client().await.ok(); - if let Some(client) = client { - let rmcp_client = client.client; - let tool_timeout = client.tool_timeout; - let tool_filter = client.tool_filter; - let mut server_tools = client.tools; - - if server_name == CODEX_APPS_MCP_SERVER_NAME { - match list_tools_for_client(server_name, &rmcp_client, tool_timeout).await { - Ok(fresh_or_cached_tools) => { - server_tools = fresh_or_cached_tools; - } - Err(err) => { - warn!( - "Failed to refresh tools for MCP server '{server_name}', using startup snapshot: {err:#}" - ); - } - } - } - - tools.extend(qualify_tools(filter_tools(server_tools, tool_filter))); - } + for managed_client in self.clients.values() { + let Some(server_tools) = managed_client.listed_tools().await else { + continue; + }; + tools.extend(qualify_tools(server_tools)); } tools } @@ -615,6 +734,8 @@ impl McpConnectionManager { .await .context("failed to get client")?; + let list_start = Instant::now(); + let fetch_start = Instant::now(); let tools = list_tools_for_client_uncached( CODEX_APPS_MCP_SERVER_NAME, &managed_client.client, @@ -624,8 +745,22 @@ impl McpConnectionManager { .with_context(|| { format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'") })?; + emit_duration( + MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC, + fetch_start.elapsed(), + &[], + ); - write_cached_codex_apps_tools(&tools); + write_cached_codex_apps_tools_if_needed( + CODEX_APPS_MCP_SERVER_NAME, + managed_client.codex_apps_tools_cache_context.as_ref(), + &tools, + ); + emit_duration( + MCP_TOOLS_LIST_DURATION_METRIC, + list_start.elapsed(), + &[("cache", "miss")], + ); Ok(()) } @@ -934,7 +1069,7 @@ impl ToolFilter { } } -fn filter_tools(tools: Vec, filter: ToolFilter) -> Vec { +fn filter_tools(tools: Vec, filter: &ToolFilter) -> Vec { tools .into_iter() .filter(|tool| filter.allows(&tool.tool_name)) @@ -1077,14 +1212,17 @@ fn elicitation_capability_for_server(server_name: &str) -> Option, - startup_timeout: Option, // TODO: cancel_token should handle this. - tool_timeout: Duration, - tool_filter: ToolFilter, - tx_event: Sender, - elicitation_requests: ElicitationRequestManager, + params: StartServerTaskParams, ) -> Result { + let StartServerTaskParams { + startup_timeout, + tool_timeout, + tool_filter, + tx_event, + elicitation_requests, + codex_apps_tools_cache_context, + } = params; let elicitation = elicitation_capability_for_server(&server_name); - let params = InitializeRequestParams { meta: None, capabilities: ClientCapabilities { @@ -1113,9 +1251,29 @@ async fn start_server_task( .await .map_err(StartupOutcomeError::from)?; - let tools = list_tools_for_client(&server_name, &client, startup_timeout) + let list_start = Instant::now(); + let fetch_start = Instant::now(); + let tools = list_tools_for_client_uncached(&server_name, &client, startup_timeout) .await .map_err(StartupOutcomeError::from)?; + emit_duration( + MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC, + fetch_start.elapsed(), + &[], + ); + write_cached_codex_apps_tools_if_needed( + &server_name, + codex_apps_tools_cache_context.as_ref(), + &tools, + ); + if server_name == CODEX_APPS_MCP_SERVER_NAME { + emit_duration( + MCP_TOOLS_LIST_DURATION_METRIC, + list_start.elapsed(), + &[("cache", "miss")], + ); + } + let tools = filter_tools(tools, &tool_filter); let server_supports_sandbox_state_capability = initialize_result .capabilities @@ -1130,11 +1288,21 @@ async fn start_server_task( tool_timeout: Some(tool_timeout), tool_filter, server_supports_sandbox_state_capability, + codex_apps_tools_cache_context, }; Ok(managed) } +struct StartServerTaskParams { + startup_timeout: Option, // TODO: cancel_token should handle this. + tool_timeout: Duration, + tool_filter: ToolFilter, + tx_event: Sender, + elicitation_requests: ElicitationRequestManager, + codex_apps_tools_cache_context: Option, +} + async fn make_rmcp_client( server_name: &str, transport: McpServerTransportConfig, @@ -1179,80 +1347,99 @@ async fn make_rmcp_client( } } -async fn list_tools_for_client( +fn write_cached_codex_apps_tools_if_needed( server_name: &str, - client: &Arc, - timeout: Option, -) -> Result> { - let total_start = Instant::now(); - if server_name == CODEX_APPS_MCP_SERVER_NAME - && let Some(cached_tools) = read_cached_codex_apps_tools() - { - emit_duration( - MCP_TOOLS_LIST_DURATION_METRIC, - total_start.elapsed(), - &[("cache", "hit")], - ); - return Ok(cached_tools); + cache_context: Option<&CodexAppsToolsCacheContext>, + tools: &[ToolInfo], +) { + if server_name != CODEX_APPS_MCP_SERVER_NAME { + return; } - let fetch_start = Instant::now(); - let tools = list_tools_for_client_uncached(server_name, client, timeout).await?; - emit_duration( - MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC, - fetch_start.elapsed(), - &[], - ); - - if server_name == CODEX_APPS_MCP_SERVER_NAME { + if let Some(cache_context) = cache_context { let cache_write_start = Instant::now(); - write_cached_codex_apps_tools(&tools); + write_cached_codex_apps_tools(cache_context, tools); emit_duration( MCP_TOOLS_CACHE_WRITE_DURATION_METRIC, cache_write_start.elapsed(), &[], ); } - - if server_name == CODEX_APPS_MCP_SERVER_NAME { - emit_duration( - MCP_TOOLS_LIST_DURATION_METRIC, - total_start.elapsed(), - &[("cache", "miss")], - ); - } - Ok(tools) } -fn read_cached_codex_apps_tools() -> Option> { - let mut cache_guard = CODEX_APPS_TOOLS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - let now = Instant::now(); - - if let Some(cached) = cache_guard.as_ref() - && now < cached.expires_at - { - return Some(cached.tools.clone()); +fn load_startup_cached_codex_apps_tools_snapshot( + server_name: &str, + cache_context: Option<&CodexAppsToolsCacheContext>, +) -> Option> { + if server_name != CODEX_APPS_MCP_SERVER_NAME { + return None; } - if cache_guard - .as_ref() - .is_some_and(|cached| now >= cached.expires_at) - { - *cache_guard = None; + let cache_context = cache_context?; + + match load_cached_codex_apps_tools(cache_context) { + CachedCodexAppsToolsLoad::Hit(tools) => Some(tools), + CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None, } - None } -fn write_cached_codex_apps_tools(tools: &[ToolInfo]) { - let mut cache_guard = CODEX_APPS_TOOLS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - *cache_guard = Some(CachedCodexAppsTools { - expires_at: Instant::now() + CODEX_APPS_TOOLS_CACHE_TTL, - tools: tools.to_vec(), - }); +#[cfg(test)] +fn read_cached_codex_apps_tools( + cache_context: &CodexAppsToolsCacheContext, +) -> Option> { + match load_cached_codex_apps_tools(cache_context) { + CachedCodexAppsToolsLoad::Hit(tools) => Some(tools), + CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None, + } +} + +fn load_cached_codex_apps_tools( + cache_context: &CodexAppsToolsCacheContext, +) -> CachedCodexAppsToolsLoad { + let cache_path = cache_context.cache_path(); + let bytes = match std::fs::read(cache_path) { + Ok(bytes) => bytes, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + return CachedCodexAppsToolsLoad::Missing; + } + Err(_) => return CachedCodexAppsToolsLoad::Invalid, + }; + let cache: CodexAppsToolsDiskCache = match serde_json::from_slice(&bytes) { + Ok(cache) => cache, + Err(_) => return CachedCodexAppsToolsLoad::Invalid, + }; + if cache.schema_version != CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION { + return CachedCodexAppsToolsLoad::Invalid; + } + CachedCodexAppsToolsLoad::Hit(filter_disallowed_codex_apps_tools(cache.tools)) +} + +fn write_cached_codex_apps_tools(cache_context: &CodexAppsToolsCacheContext, tools: &[ToolInfo]) { + let cache_path = cache_context.cache_path(); + if let Some(parent) = cache_path.parent() + && std::fs::create_dir_all(parent).is_err() + { + return; + } + let tools = filter_disallowed_codex_apps_tools(tools.to_vec()); + let Ok(bytes) = serde_json::to_vec_pretty(&CodexAppsToolsDiskCache { + schema_version: CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION, + tools, + }) else { + return; + }; + let _ = std::fs::write(cache_path, bytes); +} + +fn filter_disallowed_codex_apps_tools(tools: Vec) -> Vec { + tools + .into_iter() + .filter(|tool| { + tool.connector_id + .as_deref() + .is_none_or(is_connector_id_allowed) + }) + .collect() } fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) { @@ -1267,7 +1454,7 @@ async fn list_tools_for_client_uncached( timeout: Option, ) -> Result> { let resp = client.list_tools_with_connector_ids(None, timeout).await?; - Ok(resp + let tools = resp .tools .into_iter() .map(|tool| { @@ -1288,7 +1475,11 @@ async fn list_tools_for_client_uncached( connector_name, } }) - .collect()) + .collect(); + if server_name == CODEX_APPS_MCP_SERVER_NAME { + return Ok(filter_disallowed_codex_apps_tools(tools)); + } + Ok(tools) } fn validate_mcp_server_name(server_name: &str) -> Result<()> { @@ -1376,6 +1567,7 @@ mod tests { use rmcp::model::JsonObject; use std::collections::HashSet; use std::sync::Arc; + use tempfile::tempdir; fn create_test_tool(server_name: &str, tool_name: &str) -> ToolInfo { ToolInfo { @@ -1397,19 +1589,31 @@ mod tests { } } - fn with_clean_codex_apps_tools_cache(f: impl FnOnce() -> T) -> T { - let previous_cache = { - let mut cache_guard = CODEX_APPS_TOOLS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - cache_guard.take() - }; - let result = f(); - let mut cache_guard = CODEX_APPS_TOOLS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - *cache_guard = previous_cache; - result + fn create_test_tool_with_connector( + server_name: &str, + tool_name: &str, + connector_id: &str, + connector_name: Option<&str>, + ) -> ToolInfo { + let mut tool = create_test_tool(server_name, tool_name); + tool.connector_id = Some(connector_id.to_string()); + tool.connector_name = connector_name.map(ToOwned::to_owned); + tool + } + + fn create_codex_apps_tools_cache_context( + codex_home: PathBuf, + account_id: Option<&str>, + chatgpt_user_id: Option<&str>, + ) -> CodexAppsToolsCacheContext { + CodexAppsToolsCacheContext { + codex_home, + user_key: CodexAppsToolsCacheKey { + account_id: account_id.map(ToOwned::to_owned), + chatgpt_user_id: chatgpt_user_id.map(ToOwned::to_owned), + is_workspace_account: false, + }, + } } #[test] @@ -1586,9 +1790,9 @@ mod tests { disabled: HashSet::from(["tool_a".to_string()]), }; - let filtered: Vec<_> = filter_tools(server1_tools, server1_filter) + let filtered: Vec<_> = filter_tools(server1_tools, &server1_filter) .into_iter() - .chain(filter_tools(server2_tools, server2_filter)) + .chain(filter_tools(server2_tools, &server2_filter)) .collect(); assert_eq!(filtered.len(), 1); @@ -1598,43 +1802,260 @@ mod tests { #[test] fn codex_apps_tools_cache_is_overwritten_by_last_write() { - with_clean_codex_apps_tools_cache(|| { - let tools_gateway_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")]; - let tools_gateway_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")]; + let codex_home = tempdir().expect("tempdir"); + let cache_context = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-one"), + Some("user-one"), + ); + let tools_gateway_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")]; + let tools_gateway_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")]; - write_cached_codex_apps_tools(&tools_gateway_1); - let cached_gateway_1 = - read_cached_codex_apps_tools().expect("cache entry exists for first write"); - assert_eq!(cached_gateway_1[0].tool_name, "one"); + write_cached_codex_apps_tools(&cache_context, &tools_gateway_1); + let cached_gateway_1 = read_cached_codex_apps_tools(&cache_context) + .expect("cache entry exists for first write"); + assert_eq!(cached_gateway_1[0].tool_name, "one"); - write_cached_codex_apps_tools(&tools_gateway_2); - let cached_gateway_2 = - read_cached_codex_apps_tools().expect("cache entry exists for second write"); - assert_eq!(cached_gateway_2[0].tool_name, "two"); - }); + write_cached_codex_apps_tools(&cache_context, &tools_gateway_2); + let cached_gateway_2 = read_cached_codex_apps_tools(&cache_context) + .expect("cache entry exists for second write"); + assert_eq!(cached_gateway_2[0].tool_name, "two"); } #[test] - fn codex_apps_tools_cache_is_cleared_when_expired() { - with_clean_codex_apps_tools_cache(|| { - let tools = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "stale_tool")]; - write_cached_codex_apps_tools(&tools); + fn codex_apps_tools_cache_is_scoped_per_user() { + let codex_home = tempdir().expect("tempdir"); + let cache_context_user_1 = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-one"), + Some("user-one"), + ); + let cache_context_user_2 = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-two"), + Some("user-two"), + ); + let tools_user_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")]; + let tools_user_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")]; - { - let mut cache_guard = CODEX_APPS_TOOLS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - cache_guard.as_mut().expect("cache exists").expires_at = - Instant::now() - Duration::from_secs(1); - } + write_cached_codex_apps_tools(&cache_context_user_1, &tools_user_1); + write_cached_codex_apps_tools(&cache_context_user_2, &tools_user_2); - assert!(read_cached_codex_apps_tools().is_none()); + let read_user_1 = + read_cached_codex_apps_tools(&cache_context_user_1).expect("cache entry for user one"); + let read_user_2 = + read_cached_codex_apps_tools(&cache_context_user_2).expect("cache entry for user two"); - let cache_guard = CODEX_APPS_TOOLS_CACHE - .lock() - .unwrap_or_else(std::sync::PoisonError::into_inner); - assert!(cache_guard.is_none()); - }); + assert_eq!(read_user_1[0].tool_name, "one"); + assert_eq!(read_user_2[0].tool_name, "two"); + assert_ne!( + cache_context_user_1.cache_path(), + cache_context_user_2.cache_path(), + "each user should get an isolated cache file" + ); + } + + #[test] + fn codex_apps_tools_cache_filters_disallowed_connectors() { + let codex_home = tempdir().expect("tempdir"); + let cache_context = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-one"), + Some("user-one"), + ); + let tools = vec![ + create_test_tool_with_connector( + CODEX_APPS_MCP_SERVER_NAME, + "blocked_tool", + "connector_openai_hidden", + Some("Hidden"), + ), + create_test_tool_with_connector( + CODEX_APPS_MCP_SERVER_NAME, + "allowed_tool", + "calendar", + Some("Calendar"), + ), + ]; + + write_cached_codex_apps_tools(&cache_context, &tools); + let cached = + read_cached_codex_apps_tools(&cache_context).expect("cache entry exists for user"); + + assert_eq!(cached.len(), 1); + assert_eq!(cached[0].tool_name, "allowed_tool"); + assert_eq!(cached[0].connector_id.as_deref(), Some("calendar")); + } + + #[test] + fn codex_apps_tools_cache_is_ignored_when_schema_version_mismatches() { + let codex_home = tempdir().expect("tempdir"); + let cache_context = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-one"), + Some("user-one"), + ); + let cache_path = cache_context.cache_path(); + if let Some(parent) = cache_path.parent() { + std::fs::create_dir_all(parent).expect("create parent"); + } + let bytes = serde_json::to_vec_pretty(&serde_json::json!({ + "schema_version": CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION + 1, + "tools": [create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")], + })) + .expect("serialize"); + std::fs::write(cache_path, bytes).expect("write"); + + assert!(read_cached_codex_apps_tools(&cache_context).is_none()); + } + + #[test] + fn codex_apps_tools_cache_is_ignored_when_json_is_invalid() { + let codex_home = tempdir().expect("tempdir"); + let cache_context = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-one"), + Some("user-one"), + ); + let cache_path = cache_context.cache_path(); + if let Some(parent) = cache_path.parent() { + std::fs::create_dir_all(parent).expect("create parent"); + } + std::fs::write(cache_path, b"{not json").expect("write"); + + assert!(read_cached_codex_apps_tools(&cache_context).is_none()); + } + + #[test] + fn startup_cached_codex_apps_tools_loads_from_disk_cache() { + let codex_home = tempdir().expect("tempdir"); + let cache_context = create_codex_apps_tools_cache_context( + codex_home.path().to_path_buf(), + Some("account-one"), + Some("user-one"), + ); + let cached_tools = vec![create_test_tool( + CODEX_APPS_MCP_SERVER_NAME, + "calendar_search", + )]; + write_cached_codex_apps_tools(&cache_context, &cached_tools); + + let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot( + CODEX_APPS_MCP_SERVER_NAME, + Some(&cache_context), + ); + let startup_tools = startup_snapshot.expect("expected startup snapshot to load from cache"); + + assert_eq!(startup_tools.len(), 1); + assert_eq!(startup_tools[0].server_name, CODEX_APPS_MCP_SERVER_NAME); + assert_eq!(startup_tools[0].tool_name, "calendar_search"); + } + + #[tokio::test] + async fn list_all_tools_uses_startup_snapshot_while_client_is_pending() { + let startup_tools = vec![create_test_tool( + CODEX_APPS_MCP_SERVER_NAME, + "calendar_create_event", + )]; + let pending_client = + futures::future::pending::>() + .boxed() + .shared(); + let approval_policy = Constrained::allow_any(AskForApproval::OnFailure); + let mut manager = McpConnectionManager::new_uninitialized(&approval_policy); + manager.clients.insert( + CODEX_APPS_MCP_SERVER_NAME.to_string(), + AsyncManagedClient { + client: pending_client, + startup_snapshot: Some(startup_tools), + startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)), + }, + ); + + let tools = manager.list_all_tools().await; + let tool = tools + .get("mcp__codex_apps__calendar_create_event") + .expect("tool from startup cache"); + assert_eq!(tool.server_name, CODEX_APPS_MCP_SERVER_NAME); + assert_eq!(tool.tool_name, "calendar_create_event"); + } + + #[tokio::test] + async fn list_all_tools_blocks_while_client_is_pending_without_startup_snapshot() { + let pending_client = + futures::future::pending::>() + .boxed() + .shared(); + let approval_policy = Constrained::allow_any(AskForApproval::OnFailure); + let mut manager = McpConnectionManager::new_uninitialized(&approval_policy); + manager.clients.insert( + CODEX_APPS_MCP_SERVER_NAME.to_string(), + AsyncManagedClient { + client: pending_client, + startup_snapshot: None, + startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)), + }, + ); + + let timeout_result = + tokio::time::timeout(Duration::from_millis(10), manager.list_all_tools()).await; + assert!(timeout_result.is_err()); + } + + #[tokio::test] + async fn list_all_tools_does_not_block_when_startup_snapshot_cache_hit_is_empty() { + let pending_client = + futures::future::pending::>() + .boxed() + .shared(); + let approval_policy = Constrained::allow_any(AskForApproval::OnFailure); + let mut manager = McpConnectionManager::new_uninitialized(&approval_policy); + manager.clients.insert( + CODEX_APPS_MCP_SERVER_NAME.to_string(), + AsyncManagedClient { + client: pending_client, + startup_snapshot: Some(Vec::new()), + startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)), + }, + ); + + let timeout_result = + tokio::time::timeout(Duration::from_millis(10), manager.list_all_tools()).await; + let tools = timeout_result.expect("cache-hit startup snapshot should not block"); + assert!(tools.is_empty()); + } + + #[tokio::test] + async fn list_all_tools_uses_startup_snapshot_when_client_startup_fails() { + let startup_tools = vec![create_test_tool( + CODEX_APPS_MCP_SERVER_NAME, + "calendar_create_event", + )]; + let failed_client = futures::future::ready::>( + Err(StartupOutcomeError::Failed { + error: "startup failed".to_string(), + }), + ) + .boxed() + .shared(); + let approval_policy = Constrained::allow_any(AskForApproval::OnFailure); + let mut manager = McpConnectionManager::new_uninitialized(&approval_policy); + let startup_complete = Arc::new(std::sync::atomic::AtomicBool::new(true)); + manager.clients.insert( + CODEX_APPS_MCP_SERVER_NAME.to_string(), + AsyncManagedClient { + client: failed_client, + startup_snapshot: Some(startup_tools), + startup_complete, + }, + ); + + let tools = manager.list_all_tools().await; + let tool = tools + .get("mcp__codex_apps__calendar_create_event") + .expect("tool from startup cache"); + assert_eq!(tool.server_name, CODEX_APPS_MCP_SERVER_NAME); + assert_eq!(tool.tool_name, "calendar_create_event"); } #[test] diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 9e746b4c5..7dcab8010 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -539,6 +539,7 @@ pub(crate) struct ChatWidget { mcp_startup_status: Option>, connectors_cache: ConnectorsCacheState, connectors_prefetch_in_flight: bool, + connectors_force_refetch_pending: bool, // Queue of interruptive UI events deferred during an active write cycle interrupts: InterruptManager, // Accumulates the current reasoning block text to extract a header @@ -2650,6 +2651,7 @@ impl ChatWidget { mcp_startup_status: None, connectors_cache: ConnectorsCacheState::default(), connectors_prefetch_in_flight: false, + connectors_force_refetch_pending: false, interrupts: InterruptManager::new(), reasoning_buffer: String::new(), full_reasoning_buffer: String::new(), @@ -2813,6 +2815,7 @@ impl ChatWidget { mcp_startup_status: None, connectors_cache: ConnectorsCacheState::default(), connectors_prefetch_in_flight: false, + connectors_force_refetch_pending: false, interrupts: InterruptManager::new(), reasoning_buffer: String::new(), full_reasoning_buffer: String::new(), @@ -2965,6 +2968,7 @@ impl ChatWidget { mcp_startup_status: None, connectors_cache: ConnectorsCacheState::default(), connectors_prefetch_in_flight: false, + connectors_force_refetch_pending: false, interrupts: InterruptManager::new(), reasoning_buffer: String::new(), full_reasoning_buffer: String::new(), @@ -4601,7 +4605,13 @@ impl ChatWidget { } fn prefetch_connectors_with_options(&mut self, force_refetch: bool) { - if !self.connectors_enabled() || self.connectors_prefetch_in_flight { + if !self.connectors_enabled() { + return; + } + if self.connectors_prefetch_in_flight { + if force_refetch { + self.connectors_force_refetch_pending = true; + } return; } @@ -4613,8 +4623,8 @@ impl ChatWidget { let config = self.config.clone(); let app_event_tx = self.app_event_tx.clone(); tokio::spawn(async move { - let accessible_connectors = - match connectors::list_accessible_connectors_from_mcp_tools_with_options( + let accessible_result = + match connectors::list_accessible_connectors_from_mcp_tools_with_options_and_status( &config, force_refetch, ) @@ -4629,6 +4639,9 @@ impl ChatWidget { return; } }; + let should_schedule_force_refetch = + !force_refetch && !accessible_result.codex_apps_ready; + let accessible_connectors = accessible_result.connectors; app_event_tx.send(AppEvent::ConnectorsLoaded { result: Ok(ConnectorsSnapshot { @@ -4638,7 +4651,8 @@ impl ChatWidget { }); let result: Result = async { - let all_connectors = connectors::list_all_connectors(&config).await?; + let all_connectors = + connectors::list_all_connectors_with_options(&config, force_refetch).await?; let connectors = connectors::merge_connectors_with_accessible( all_connectors, accessible_connectors, @@ -4653,6 +4667,12 @@ impl ChatWidget { result, is_final: true, }); + + if should_schedule_force_refetch { + app_event_tx.send(AppEvent::RefreshConnectors { + force_refetch: true, + }); + } }); } @@ -6821,8 +6841,13 @@ impl ChatWidget { result: Result, is_final: bool, ) { + let mut trigger_pending_force_refetch = false; if is_final { self.connectors_prefetch_in_flight = false; + if self.connectors_force_refetch_pending { + self.connectors_force_refetch_pending = false; + trigger_pending_force_refetch = true; + } } match result { @@ -6857,13 +6882,16 @@ impl ChatWidget { Err(err) => { if matches!(self.connectors_cache, ConnectorsCacheState::Ready(_)) { warn!("failed to refresh apps list; retaining current apps snapshot: {err}"); - return; + } else { + self.connectors_cache = ConnectorsCacheState::Failed(err); + self.bottom_pane.set_connectors_snapshot(None); } - - self.connectors_cache = ConnectorsCacheState::Failed(err); - self.bottom_pane.set_connectors_snapshot(None); } } + + if trigger_pending_force_refetch { + self.prefetch_connectors_with_options(true); + } } pub(crate) fn update_connector_enabled(&mut self, connector_id: &str, enabled: bool) { diff --git a/codex-rs/tui/src/chatwidget/tests.rs b/codex-rs/tui/src/chatwidget/tests.rs index 829c39d0a..d6d2ab1ad 100644 --- a/codex-rs/tui/src/chatwidget/tests.rs +++ b/codex-rs/tui/src/chatwidget/tests.rs @@ -1627,6 +1627,7 @@ async fn make_chatwidget_manual( mcp_startup_status: None, connectors_cache: ConnectorsCacheState::default(), connectors_prefetch_in_flight: false, + connectors_force_refetch_pending: false, interrupts: InterruptManager::new(), reasoning_buffer: String::new(), full_reasoning_buffer: String::new(), @@ -4582,6 +4583,42 @@ async fn apps_refresh_failure_keeps_existing_full_snapshot() { ); } +#[tokio::test] +async fn apps_refresh_failure_with_cached_snapshot_triggers_pending_force_refetch() { + let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await; + chat.config.features.enable(Feature::Apps); + chat.bottom_pane.set_connectors_enabled(true); + chat.connectors_prefetch_in_flight = true; + chat.connectors_force_refetch_pending = true; + + let full_connectors = vec![codex_chatgpt::connectors::AppInfo { + id: "unit_test_apps_refresh_failure_pending_connector".to_string(), + name: "Notion".to_string(), + description: Some("Workspace docs".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + branding: None, + app_metadata: None, + labels: None, + install_url: Some("https://example.test/notion".to_string()), + is_accessible: true, + is_enabled: true, + }]; + chat.connectors_cache = ConnectorsCacheState::Ready(ConnectorsSnapshot { + connectors: full_connectors.clone(), + }); + + chat.on_connectors_loaded(Err("failed to load apps".to_string()), true); + + assert!(chat.connectors_prefetch_in_flight); + assert!(!chat.connectors_force_refetch_pending); + assert_matches!( + &chat.connectors_cache, + ConnectorsCacheState::Ready(snapshot) if snapshot.connectors == full_connectors + ); +} + #[tokio::test] async fn apps_partial_refresh_uses_same_filtering_as_full_refresh() { let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;