[apps] Store apps tool cache in disk to reduce startup time. (#11822)

We now write MCP tools from installed apps to disk cache so that they
can be picked up instantly at startup. We still do a fresh fetch from
remote MCP server but it's non blocking unless there's a cache miss.

- [x] Store apps tool cache in disk to reduce startup time.
This commit is contained in:
Matthew Zeng 2026-02-19 22:06:51 -08:00 committed by GitHub
parent b06f91c4fe
commit 18bd6d2d71
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 848 additions and 244 deletions

View file

@ -4,8 +4,6 @@ use std::sync::LazyLock;
use std::sync::Mutex as StdMutex;
use codex_core::config::Config;
use codex_core::default_client::is_first_party_chat_originator;
use codex_core::default_client::originator;
use codex_core::features::Feature;
use codex_core::token_data::TokenData;
use serde::Deserialize;
@ -22,8 +20,10 @@ use codex_core::connectors::AppMetadata;
use codex_core::connectors::CONNECTORS_CACHE_TTL;
pub use codex_core::connectors::connector_display_label;
use codex_core::connectors::connector_install_url;
use codex_core::connectors::filter_disallowed_connectors;
pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools;
pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools_with_options;
pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools_with_options_and_status;
pub use codex_core::connectors::list_cached_accessible_connectors_from_mcp_tools;
use codex_core::connectors::merge_connectors;
pub use codex_core::connectors::with_app_enabled_state;
@ -106,7 +106,7 @@ pub async fn list_cached_all_connectors(config: &Config) -> Option<Vec<AppInfo>>
}
let token_data = get_chatgpt_token_data()?;
let cache_key = all_connectors_cache_key(config, &token_data);
read_cached_all_connectors(&cache_key)
read_cached_all_connectors(&cache_key).map(filter_disallowed_connectors)
}
pub async fn list_all_connectors_with_options(
@ -123,7 +123,7 @@ pub async fn list_all_connectors_with_options(
get_chatgpt_token_data().ok_or_else(|| anyhow::anyhow!("ChatGPT token not available"))?;
let cache_key = all_connectors_cache_key(config, &token_data);
if !force_refetch && let Some(cached_connectors) = read_cached_all_connectors(&cache_key) {
return Ok(cached_connectors);
return Ok(filter_disallowed_connectors(cached_connectors));
}
let mut apps = list_directory_connectors(config).await?;
@ -149,6 +149,7 @@ pub async fn list_all_connectors_with_options(
.cmp(&right.name)
.then_with(|| left.id.cmp(&right.id))
});
let connectors = filter_disallowed_connectors(connectors);
write_cached_all_connectors(cache_key, &connectors);
Ok(connectors)
}
@ -453,45 +454,6 @@ fn normalize_connector_value(value: Option<&str>) -> Option<String> {
.filter(|value| !value.is_empty())
.map(str::to_string)
}
const DISALLOWED_CONNECTOR_IDS: &[&str] = &[
"asdk_app_6938a94a61d881918ef32cb999ff937c",
"connector_2b0a9009c9c64bf9933a3dae3f2b1254",
"connector_68de829bf7648191acd70a907364c67c",
"connector_68e004f14af881919eb50893d3d9f523",
"connector_69272cb413a081919685ec3c88d1744e",
"connector_0f9c9d4592e54d0a9a12b3f44a1e2010",
];
const FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS: &[&str] =
&["connector_0f9c9d4592e54d0a9a12b3f44a1e2010"];
const DISALLOWED_CONNECTOR_PREFIX: &str = "connector_openai_";
fn filter_disallowed_connectors(connectors: Vec<AppInfo>) -> Vec<AppInfo> {
filter_disallowed_connectors_for_originator(connectors, originator().value.as_str())
}
fn filter_disallowed_connectors_for_originator(
connectors: Vec<AppInfo>,
originator_value: &str,
) -> Vec<AppInfo> {
let disallowed_connector_ids = if is_first_party_chat_originator(originator_value) {
FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS
} else {
DISALLOWED_CONNECTOR_IDS
};
connectors
.into_iter()
.filter(|connector| is_connector_allowed(connector, disallowed_connector_ids))
.collect()
}
fn is_connector_allowed(connector: &AppInfo, disallowed_connector_ids: &[&str]) -> bool {
let connector_id = connector.id.as_str();
!connector_id.starts_with(DISALLOWED_CONNECTOR_PREFIX)
&& !disallowed_connector_ids.contains(&connector_id)
}
#[cfg(test)]
mod tests {
use super::*;
@ -554,22 +516,6 @@ mod tests {
assert_eq!(filtered, vec![app("delta")]);
}
#[test]
fn first_party_chat_originator_filters_target_and_openai_prefixed_connectors() {
let filtered = filter_disallowed_connectors_for_originator(
vec![
app("connector_openai_foo"),
app("asdk_app_6938a94a61d881918ef32cb999ff937c"),
app("connector_0f9c9d4592e54d0a9a12b3f44a1e2010"),
],
"codex_atlas",
);
assert_eq!(
filtered,
vec![app("asdk_app_6938a94a61d881918ef32cb999ff937c"),]
);
}
fn merged_app(id: &str, is_accessible: bool) -> AppInfo {
AppInfo {
id: id.to_string(),

View file

@ -149,6 +149,7 @@ use crate::mcp::effective_mcp_servers;
use crate::mcp::maybe_prompt_and_install_mcp_dependencies;
use crate::mcp::with_codex_apps_mcp;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
use crate::mcp_connection_manager::filter_codex_apps_mcp_tools_only;
use crate::mcp_connection_manager::filter_mcp_tools_by_name;
use crate::mcp_connection_manager::filter_non_codex_apps_mcp_tools_only;
@ -1420,6 +1421,8 @@ impl Session {
&session_configuration.approval_policy,
tx_event.clone(),
sandbox_state,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth),
)
.await;
{
@ -3068,6 +3071,8 @@ impl Session {
&turn_context.config.permissions.approval_policy,
self.get_tx_event(),
sandbox_state,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth.as_ref()),
)
.await;
{

View file

@ -19,15 +19,18 @@ use crate::CodexAuth;
use crate::SandboxState;
use crate::config::Config;
use crate::config::types::AppsConfigToml;
use crate::default_client::is_first_party_chat_originator;
use crate::default_client::originator;
use crate::features::Feature;
use crate::mcp::CODEX_APPS_MCP_SERVER_NAME;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp::with_codex_apps_mcp;
use crate::mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
use crate::token_data::TokenData;
pub const CONNECTORS_CACHE_TTL: Duration = Duration::from_secs(3600);
const CONNECTORS_READY_TIMEOUT_ON_EMPTY_TOOLS: Duration = Duration::from_secs(30);
#[derive(Clone, PartialEq, Eq)]
struct AccessibleConnectorsCacheKey {
@ -47,10 +50,20 @@ struct CachedAccessibleConnectors {
static ACCESSIBLE_CONNECTORS_CACHE: LazyLock<StdMutex<Option<CachedAccessibleConnectors>>> =
LazyLock::new(|| StdMutex::new(None));
#[derive(Debug, Clone)]
pub struct AccessibleConnectorsStatus {
pub connectors: Vec<AppInfo>,
pub codex_apps_ready: bool,
}
pub async fn list_accessible_connectors_from_mcp_tools(
config: &Config,
) -> anyhow::Result<Vec<AppInfo>> {
list_accessible_connectors_from_mcp_tools_with_options(config, false).await
Ok(
list_accessible_connectors_from_mcp_tools_with_options_and_status(config, false)
.await?
.connectors,
)
}
pub async fn list_cached_accessible_connectors_from_mcp_tools(
@ -63,15 +76,29 @@ pub async fn list_cached_accessible_connectors_from_mcp_tools(
let auth_manager = auth_manager_from_config(config);
let auth = auth_manager.auth().await;
let cache_key = accessible_connectors_cache_key(config, auth.as_ref());
read_cached_accessible_connectors(&cache_key)
read_cached_accessible_connectors(&cache_key).map(filter_disallowed_connectors)
}
pub async fn list_accessible_connectors_from_mcp_tools_with_options(
config: &Config,
force_refetch: bool,
) -> anyhow::Result<Vec<AppInfo>> {
Ok(
list_accessible_connectors_from_mcp_tools_with_options_and_status(config, force_refetch)
.await?
.connectors,
)
}
pub async fn list_accessible_connectors_from_mcp_tools_with_options_and_status(
config: &Config,
force_refetch: bool,
) -> anyhow::Result<AccessibleConnectorsStatus> {
if !config.features.enabled(Feature::Apps) {
return Ok(Vec::new());
return Ok(AccessibleConnectorsStatus {
connectors: Vec::new(),
codex_apps_ready: true,
});
}
let auth_manager = auth_manager_from_config(config);
@ -79,12 +106,19 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options(
let cache_key = accessible_connectors_cache_key(config, auth.as_ref());
if !force_refetch && let Some(cached_connectors) = read_cached_accessible_connectors(&cache_key)
{
return Ok(cached_connectors);
let cached_connectors = filter_disallowed_connectors(cached_connectors);
return Ok(AccessibleConnectorsStatus {
connectors: cached_connectors,
codex_apps_ready: true,
});
}
let mcp_servers = with_codex_apps_mcp(HashMap::new(), true, auth.as_ref(), config);
if mcp_servers.is_empty() {
return Ok(Vec::new());
return Ok(AccessibleConnectorsStatus {
connectors: Vec::new(),
codex_apps_ready: true,
});
}
let auth_status_entries =
@ -107,6 +141,8 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options(
&config.permissions.approval_policy,
tx_event,
sandbox_state,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth.as_ref()),
)
.await;
@ -120,23 +156,45 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options(
);
}
let mut tools = mcp_connection_manager.list_all_tools().await;
let mut should_reload_tools = false;
let codex_apps_ready = if let Some(cfg) = mcp_servers.get(CODEX_APPS_MCP_SERVER_NAME) {
let timeout = cfg.startup_timeout_sec.unwrap_or(DEFAULT_STARTUP_TIMEOUT);
mcp_connection_manager
.wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, timeout)
.await
let immediate_ready = mcp_connection_manager
.wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, Duration::ZERO)
.await;
if immediate_ready {
true
} else if tools.is_empty() {
let timeout = cfg
.startup_timeout_sec
.unwrap_or(CONNECTORS_READY_TIMEOUT_ON_EMPTY_TOOLS);
let ready = mcp_connection_manager
.wait_for_server_ready(CODEX_APPS_MCP_SERVER_NAME, timeout)
.await;
should_reload_tools = ready;
ready
} else {
false
}
} else {
false
};
if should_reload_tools {
tools = mcp_connection_manager.list_all_tools().await;
}
if codex_apps_ready {
cancel_token.cancel();
}
let tools = mcp_connection_manager.list_all_tools().await;
cancel_token.cancel();
let accessible_connectors = accessible_connectors_from_mcp_tools(&tools);
let accessible_connectors =
filter_disallowed_connectors(accessible_connectors_from_mcp_tools(&tools));
if codex_apps_ready || !accessible_connectors.is_empty() {
write_cached_accessible_connectors(cache_key, &accessible_connectors);
}
Ok(accessible_connectors)
Ok(AccessibleConnectorsStatus {
connectors: accessible_connectors,
codex_apps_ready,
})
}
fn accessible_connectors_cache_key(
@ -288,6 +346,48 @@ pub fn with_app_enabled_state(mut connectors: Vec<AppInfo>, config: &Config) ->
connectors
}
const DISALLOWED_CONNECTOR_IDS: &[&str] = &[
"asdk_app_6938a94a61d881918ef32cb999ff937c",
"connector_2b0a9009c9c64bf9933a3dae3f2b1254",
"connector_68de829bf7648191acd70a907364c67c",
"connector_68e004f14af881919eb50893d3d9f523",
"connector_69272cb413a081919685ec3c88d1744e",
];
const FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS: &[&str] =
&["connector_0f9c9d4592e54d0a9a12b3f44a1e2010"];
const DISALLOWED_CONNECTOR_PREFIX: &str = "connector_openai_";
pub fn filter_disallowed_connectors(connectors: Vec<AppInfo>) -> Vec<AppInfo> {
filter_disallowed_connectors_for_originator(connectors, originator().value.as_str())
}
pub(crate) fn is_connector_id_allowed(connector_id: &str) -> bool {
is_connector_id_allowed_for_originator(connector_id, originator().value.as_str())
}
fn filter_disallowed_connectors_for_originator(
connectors: Vec<AppInfo>,
originator_value: &str,
) -> Vec<AppInfo> {
connectors
.into_iter()
.filter(|connector| {
is_connector_id_allowed_for_originator(connector.id.as_str(), originator_value)
})
.collect()
}
fn is_connector_id_allowed_for_originator(connector_id: &str, originator_value: &str) -> bool {
let disallowed_connector_ids = if is_first_party_chat_originator(originator_value) {
FIRST_PARTY_CHAT_DISALLOWED_CONNECTOR_IDS
} else {
DISALLOWED_CONNECTOR_IDS
};
!connector_id.starts_with(DISALLOWED_CONNECTOR_PREFIX)
&& !disallowed_connector_ids.contains(&connector_id)
}
fn read_apps_config(config: &Config) -> Option<AppsConfigToml> {
let effective_config = config.config_layer_stack.effective_config();
let apps_config = effective_config.as_table()?.get("apps")?.clone();
@ -368,3 +468,67 @@ pub fn connector_name_slug(name: &str) -> String {
fn format_connector_label(name: &str, _id: &str) -> String {
name.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
use pretty_assertions::assert_eq;
fn app(id: &str) -> AppInfo {
AppInfo {
id: id.to_string(),
name: id.to_string(),
description: None,
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
install_url: None,
branding: None,
app_metadata: None,
labels: None,
is_accessible: false,
is_enabled: true,
}
}
#[test]
fn filter_disallowed_connectors_allows_non_disallowed_connectors() {
let filtered = filter_disallowed_connectors(vec![app("asdk_app_hidden"), app("alpha")]);
assert_eq!(filtered, vec![app("asdk_app_hidden"), app("alpha")]);
}
#[test]
fn filter_disallowed_connectors_filters_openai_prefix() {
let filtered = filter_disallowed_connectors(vec![
app("connector_openai_foo"),
app("connector_openai_bar"),
app("gamma"),
]);
assert_eq!(filtered, vec![app("gamma")]);
}
#[test]
fn filter_disallowed_connectors_filters_disallowed_connector_ids() {
let filtered = filter_disallowed_connectors(vec![
app("asdk_app_6938a94a61d881918ef32cb999ff937c"),
app("delta"),
]);
assert_eq!(filtered, vec![app("delta")]);
}
#[test]
fn first_party_chat_originator_filters_target_and_openai_prefixed_connectors() {
let filtered = filter_disallowed_connectors_for_originator(
vec![
app("connector_openai_foo"),
app("asdk_app_6938a94a61d881918ef32cb999ff937c"),
app("connector_0f9c9d4592e54d0a9a12b3f44a1e2010"),
],
"codex_atlas",
);
assert_eq!(
filtered,
vec![app("asdk_app_6938a94a61d881918ef32cb999ff937c"),]
);
}
}

View file

@ -24,6 +24,7 @@ use crate::features::Feature;
use crate::mcp::auth::compute_auth_statuses;
use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_connection_manager::SandboxState;
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
const MCP_TOOL_NAME_PREFIX: &str = "mcp";
const MCP_TOOL_NAME_DELIMITER: &str = "__";
@ -208,6 +209,8 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent
&config.permissions.approval_policy,
tx_event,
sandbox_state,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth.as_ref()),
)
.await;

View file

@ -12,8 +12,9 @@ use std::env;
use std::ffi::OsString;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::Mutex as StdMutex;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
@ -76,6 +77,7 @@ use tracing::warn;
use crate::codex::INITIAL_SUBMIT_ID;
use crate::config::types::McpServerConfig;
use crate::config::types::McpServerTransportConfig;
use crate::connectors::is_connector_id_allowed;
/// Delimiter used to separate the server name from the tool name in a fully
/// qualified tool name.
@ -91,7 +93,8 @@ pub const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_secs(10);
/// Default timeout for individual tool calls.
const DEFAULT_TOOL_TIMEOUT: Duration = Duration::from_secs(60);
const CODEX_APPS_TOOLS_CACHE_TTL: Duration = Duration::from_secs(3600);
const CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION: u8 = 1;
const CODEX_APPS_TOOLS_CACHE_DIR: &str = "cache/codex_apps_tools";
const MCP_TOOLS_LIST_DURATION_METRIC: &str = "codex.mcp.tools.list.duration_ms";
const MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC: &str = "codex.mcp.tools.fetch_uncached.duration_ms";
const MCP_TOOLS_CACHE_WRITE_DURATION_METRIC: &str = "codex.mcp.tools.cache_write.duration_ms";
@ -123,6 +126,27 @@ fn sha1_hex(s: &str) -> String {
format!("{sha1:x}")
}
pub(crate) fn codex_apps_tools_cache_key(
auth: Option<&crate::CodexAuth>,
) -> CodexAppsToolsCacheKey {
let token_data = auth.and_then(|auth| auth.get_token_data().ok());
let account_id = token_data
.as_ref()
.and_then(|token_data| token_data.account_id.clone());
let chatgpt_user_id = token_data
.as_ref()
.and_then(|token_data| token_data.id_token.chatgpt_user_id.clone());
let is_workspace_account = token_data
.as_ref()
.is_some_and(|token_data| token_data.id_token.is_workspace_account());
CodexAppsToolsCacheKey {
account_id,
chatgpt_user_id,
is_workspace_account,
}
}
fn qualify_tools<I>(tools: I) -> HashMap<String, ToolInfo>
where
I: IntoIterator<Item = ToolInfo>,
@ -165,7 +189,7 @@ where
qualified_tools
}
#[derive(Clone)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct ToolInfo {
pub(crate) server_name: String,
pub(crate) tool_name: String,
@ -174,14 +198,40 @@ pub(crate) struct ToolInfo {
pub(crate) connector_name: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct CodexAppsToolsCacheKey {
account_id: Option<String>,
chatgpt_user_id: Option<String>,
is_workspace_account: bool,
}
#[derive(Clone)]
struct CachedCodexAppsTools {
expires_at: Instant,
struct CodexAppsToolsCacheContext {
codex_home: PathBuf,
user_key: CodexAppsToolsCacheKey,
}
impl CodexAppsToolsCacheContext {
fn cache_path(&self) -> PathBuf {
let user_key_json = serde_json::to_string(&self.user_key).unwrap_or_default();
let user_key_hash = sha1_hex(&user_key_json);
self.codex_home
.join(CODEX_APPS_TOOLS_CACHE_DIR)
.join(format!("{user_key_hash}.json"))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct CodexAppsToolsDiskCache {
schema_version: u8,
tools: Vec<ToolInfo>,
}
static CODEX_APPS_TOOLS_CACHE: LazyLock<StdMutex<Option<CachedCodexAppsTools>>> =
LazyLock::new(|| StdMutex::new(None));
enum CachedCodexAppsToolsLoad {
Hit(Vec<ToolInfo>),
Missing,
Invalid,
}
type ResponderMap = HashMap<(String, RequestId), oneshot::Sender<ElicitationResponse>>;
@ -289,9 +339,35 @@ struct ManagedClient {
tool_filter: ToolFilter,
tool_timeout: Option<Duration>,
server_supports_sandbox_state_capability: bool,
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
}
impl ManagedClient {
fn listed_tools(&self) -> Vec<ToolInfo> {
let total_start = Instant::now();
if let Some(cache_context) = self.codex_apps_tools_cache_context.as_ref()
&& let CachedCodexAppsToolsLoad::Hit(tools) =
load_cached_codex_apps_tools(cache_context)
{
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
total_start.elapsed(),
&[("cache", "hit")],
);
return filter_tools(tools, &self.tool_filter);
}
if self.codex_apps_tools_cache_context.is_some() {
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
total_start.elapsed(),
&[("cache", "miss")],
);
}
self.tools.clone()
}
/// Returns once the server has ack'd the sandbox state update.
async fn notify_sandbox_state_change(&self, sandbox_state: &SandboxState) -> Result<()> {
if !self.server_supports_sandbox_state_capability {
@ -312,6 +388,8 @@ impl ManagedClient {
#[derive(Clone)]
struct AsyncManagedClient {
client: Shared<BoxFuture<'static, Result<ManagedClient, StartupOutcomeError>>>,
startup_snapshot: Option<Vec<ToolInfo>>,
startup_complete: Arc<AtomicBool>,
}
impl AsyncManagedClient {
@ -322,33 +400,63 @@ impl AsyncManagedClient {
cancel_token: CancellationToken,
tx_event: Sender<Event>,
elicitation_requests: ElicitationRequestManager,
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
) -> Self {
let tool_filter = ToolFilter::from_config(&config);
let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot(
&server_name,
codex_apps_tools_cache_context.as_ref(),
)
.map(|tools| filter_tools(tools, &tool_filter));
let startup_tool_filter = tool_filter;
let startup_complete = Arc::new(AtomicBool::new(false));
let startup_complete_for_fut = Arc::clone(&startup_complete);
let fut = async move {
if let Err(error) = validate_mcp_server_name(&server_name) {
return Err(error.into());
}
let outcome = async {
if let Err(error) = validate_mcp_server_name(&server_name) {
return Err(error.into());
}
let client =
Arc::new(make_rmcp_client(&server_name, config.transport, store_mode).await?);
match start_server_task(
server_name,
client,
config.startup_timeout_sec.or(Some(DEFAULT_STARTUP_TIMEOUT)),
config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT),
tool_filter,
tx_event,
elicitation_requests,
)
.or_cancel(&cancel_token)
.await
{
Ok(result) => result,
Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled),
let client =
Arc::new(make_rmcp_client(&server_name, config.transport, store_mode).await?);
match start_server_task(
server_name,
client,
StartServerTaskParams {
startup_timeout: config
.startup_timeout_sec
.or(Some(DEFAULT_STARTUP_TIMEOUT)),
tool_timeout: config.tool_timeout_sec.unwrap_or(DEFAULT_TOOL_TIMEOUT),
tool_filter: startup_tool_filter,
tx_event,
elicitation_requests,
codex_apps_tools_cache_context,
},
)
.or_cancel(&cancel_token)
.await
{
Ok(result) => result,
Err(CancelErr::Cancelled) => Err(StartupOutcomeError::Cancelled),
}
}
.await;
startup_complete_for_fut.store(true, Ordering::Release);
outcome
};
let client = fut.boxed().shared();
if startup_snapshot.is_some() {
let startup_task = client.clone();
tokio::spawn(async move {
let _ = startup_task.await;
});
}
Self {
client: fut.boxed().shared(),
client,
startup_snapshot,
startup_complete,
}
}
@ -356,6 +464,24 @@ impl AsyncManagedClient {
self.client.clone().await
}
fn startup_snapshot_while_initializing(&self) -> Option<Vec<ToolInfo>> {
if !self.startup_complete.load(Ordering::Acquire) {
return self.startup_snapshot.clone();
}
None
}
async fn listed_tools(&self) -> Option<Vec<ToolInfo>> {
if let Some(startup_tools) = self.startup_snapshot_while_initializing() {
return Some(startup_tools);
}
match self.client().await {
Ok(client) => Some(client.listed_tools()),
Err(_) => self.startup_snapshot.clone(),
}
}
async fn notify_sandbox_state_change(&self, sandbox_state: &SandboxState) -> Result<()> {
let managed = self.client().await?;
managed.notify_sandbox_state_change(sandbox_state).await
@ -417,6 +543,8 @@ impl McpConnectionManager {
approval_policy: &Constrained<AskForApproval>,
tx_event: Sender<Event>,
initial_sandbox_state: SandboxState,
codex_home: PathBuf,
codex_apps_tools_cache_key: CodexAppsToolsCacheKey,
) -> (Self, CancellationToken) {
let cancel_token = CancellationToken::new();
let mut clients = HashMap::new();
@ -433,6 +561,14 @@ impl McpConnectionManager {
},
)
.await;
let codex_apps_tools_cache_context = if server_name == CODEX_APPS_MCP_SERVER_NAME {
Some(CodexAppsToolsCacheContext {
codex_home: codex_home.clone(),
user_key: codex_apps_tools_cache_key.clone(),
})
} else {
None
};
let async_managed_client = AsyncManagedClient::new(
server_name.clone(),
cfg,
@ -440,6 +576,7 @@ impl McpConnectionManager {
cancel_token.clone(),
tx_event.clone(),
elicitation_requests.clone(),
codex_apps_tools_cache_context,
);
clients.insert(server_name.clone(), async_managed_client.clone());
let tx_event = tx_event.clone();
@ -575,29 +712,11 @@ impl McpConnectionManager {
#[instrument(level = "trace", skip_all)]
pub async fn list_all_tools(&self) -> HashMap<String, ToolInfo> {
let mut tools = HashMap::new();
for (server_name, managed_client) in &self.clients {
let client = managed_client.client().await.ok();
if let Some(client) = client {
let rmcp_client = client.client;
let tool_timeout = client.tool_timeout;
let tool_filter = client.tool_filter;
let mut server_tools = client.tools;
if server_name == CODEX_APPS_MCP_SERVER_NAME {
match list_tools_for_client(server_name, &rmcp_client, tool_timeout).await {
Ok(fresh_or_cached_tools) => {
server_tools = fresh_or_cached_tools;
}
Err(err) => {
warn!(
"Failed to refresh tools for MCP server '{server_name}', using startup snapshot: {err:#}"
);
}
}
}
tools.extend(qualify_tools(filter_tools(server_tools, tool_filter)));
}
for managed_client in self.clients.values() {
let Some(server_tools) = managed_client.listed_tools().await else {
continue;
};
tools.extend(qualify_tools(server_tools));
}
tools
}
@ -615,6 +734,8 @@ impl McpConnectionManager {
.await
.context("failed to get client")?;
let list_start = Instant::now();
let fetch_start = Instant::now();
let tools = list_tools_for_client_uncached(
CODEX_APPS_MCP_SERVER_NAME,
&managed_client.client,
@ -624,8 +745,22 @@ impl McpConnectionManager {
.with_context(|| {
format!("failed to refresh tools for MCP server '{CODEX_APPS_MCP_SERVER_NAME}'")
})?;
emit_duration(
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
fetch_start.elapsed(),
&[],
);
write_cached_codex_apps_tools(&tools);
write_cached_codex_apps_tools_if_needed(
CODEX_APPS_MCP_SERVER_NAME,
managed_client.codex_apps_tools_cache_context.as_ref(),
&tools,
);
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
list_start.elapsed(),
&[("cache", "miss")],
);
Ok(())
}
@ -934,7 +1069,7 @@ impl ToolFilter {
}
}
fn filter_tools(tools: Vec<ToolInfo>, filter: ToolFilter) -> Vec<ToolInfo> {
fn filter_tools(tools: Vec<ToolInfo>, filter: &ToolFilter) -> Vec<ToolInfo> {
tools
.into_iter()
.filter(|tool| filter.allows(&tool.tool_name))
@ -1077,14 +1212,17 @@ fn elicitation_capability_for_server(server_name: &str) -> Option<ElicitationCap
async fn start_server_task(
server_name: String,
client: Arc<RmcpClient>,
startup_timeout: Option<Duration>, // TODO: cancel_token should handle this.
tool_timeout: Duration,
tool_filter: ToolFilter,
tx_event: Sender<Event>,
elicitation_requests: ElicitationRequestManager,
params: StartServerTaskParams,
) -> Result<ManagedClient, StartupOutcomeError> {
let StartServerTaskParams {
startup_timeout,
tool_timeout,
tool_filter,
tx_event,
elicitation_requests,
codex_apps_tools_cache_context,
} = params;
let elicitation = elicitation_capability_for_server(&server_name);
let params = InitializeRequestParams {
meta: None,
capabilities: ClientCapabilities {
@ -1113,9 +1251,29 @@ async fn start_server_task(
.await
.map_err(StartupOutcomeError::from)?;
let tools = list_tools_for_client(&server_name, &client, startup_timeout)
let list_start = Instant::now();
let fetch_start = Instant::now();
let tools = list_tools_for_client_uncached(&server_name, &client, startup_timeout)
.await
.map_err(StartupOutcomeError::from)?;
emit_duration(
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
fetch_start.elapsed(),
&[],
);
write_cached_codex_apps_tools_if_needed(
&server_name,
codex_apps_tools_cache_context.as_ref(),
&tools,
);
if server_name == CODEX_APPS_MCP_SERVER_NAME {
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
list_start.elapsed(),
&[("cache", "miss")],
);
}
let tools = filter_tools(tools, &tool_filter);
let server_supports_sandbox_state_capability = initialize_result
.capabilities
@ -1130,11 +1288,21 @@ async fn start_server_task(
tool_timeout: Some(tool_timeout),
tool_filter,
server_supports_sandbox_state_capability,
codex_apps_tools_cache_context,
};
Ok(managed)
}
struct StartServerTaskParams {
startup_timeout: Option<Duration>, // TODO: cancel_token should handle this.
tool_timeout: Duration,
tool_filter: ToolFilter,
tx_event: Sender<Event>,
elicitation_requests: ElicitationRequestManager,
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
}
async fn make_rmcp_client(
server_name: &str,
transport: McpServerTransportConfig,
@ -1179,80 +1347,99 @@ async fn make_rmcp_client(
}
}
async fn list_tools_for_client(
fn write_cached_codex_apps_tools_if_needed(
server_name: &str,
client: &Arc<RmcpClient>,
timeout: Option<Duration>,
) -> Result<Vec<ToolInfo>> {
let total_start = Instant::now();
if server_name == CODEX_APPS_MCP_SERVER_NAME
&& let Some(cached_tools) = read_cached_codex_apps_tools()
{
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
total_start.elapsed(),
&[("cache", "hit")],
);
return Ok(cached_tools);
cache_context: Option<&CodexAppsToolsCacheContext>,
tools: &[ToolInfo],
) {
if server_name != CODEX_APPS_MCP_SERVER_NAME {
return;
}
let fetch_start = Instant::now();
let tools = list_tools_for_client_uncached(server_name, client, timeout).await?;
emit_duration(
MCP_TOOLS_FETCH_UNCACHED_DURATION_METRIC,
fetch_start.elapsed(),
&[],
);
if server_name == CODEX_APPS_MCP_SERVER_NAME {
if let Some(cache_context) = cache_context {
let cache_write_start = Instant::now();
write_cached_codex_apps_tools(&tools);
write_cached_codex_apps_tools(cache_context, tools);
emit_duration(
MCP_TOOLS_CACHE_WRITE_DURATION_METRIC,
cache_write_start.elapsed(),
&[],
);
}
if server_name == CODEX_APPS_MCP_SERVER_NAME {
emit_duration(
MCP_TOOLS_LIST_DURATION_METRIC,
total_start.elapsed(),
&[("cache", "miss")],
);
}
Ok(tools)
}
fn read_cached_codex_apps_tools() -> Option<Vec<ToolInfo>> {
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let now = Instant::now();
if let Some(cached) = cache_guard.as_ref()
&& now < cached.expires_at
{
return Some(cached.tools.clone());
fn load_startup_cached_codex_apps_tools_snapshot(
server_name: &str,
cache_context: Option<&CodexAppsToolsCacheContext>,
) -> Option<Vec<ToolInfo>> {
if server_name != CODEX_APPS_MCP_SERVER_NAME {
return None;
}
if cache_guard
.as_ref()
.is_some_and(|cached| now >= cached.expires_at)
{
*cache_guard = None;
let cache_context = cache_context?;
match load_cached_codex_apps_tools(cache_context) {
CachedCodexAppsToolsLoad::Hit(tools) => Some(tools),
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None,
}
None
}
fn write_cached_codex_apps_tools(tools: &[ToolInfo]) {
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*cache_guard = Some(CachedCodexAppsTools {
expires_at: Instant::now() + CODEX_APPS_TOOLS_CACHE_TTL,
tools: tools.to_vec(),
});
#[cfg(test)]
fn read_cached_codex_apps_tools(
cache_context: &CodexAppsToolsCacheContext,
) -> Option<Vec<ToolInfo>> {
match load_cached_codex_apps_tools(cache_context) {
CachedCodexAppsToolsLoad::Hit(tools) => Some(tools),
CachedCodexAppsToolsLoad::Missing | CachedCodexAppsToolsLoad::Invalid => None,
}
}
fn load_cached_codex_apps_tools(
cache_context: &CodexAppsToolsCacheContext,
) -> CachedCodexAppsToolsLoad {
let cache_path = cache_context.cache_path();
let bytes = match std::fs::read(cache_path) {
Ok(bytes) => bytes,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return CachedCodexAppsToolsLoad::Missing;
}
Err(_) => return CachedCodexAppsToolsLoad::Invalid,
};
let cache: CodexAppsToolsDiskCache = match serde_json::from_slice(&bytes) {
Ok(cache) => cache,
Err(_) => return CachedCodexAppsToolsLoad::Invalid,
};
if cache.schema_version != CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION {
return CachedCodexAppsToolsLoad::Invalid;
}
CachedCodexAppsToolsLoad::Hit(filter_disallowed_codex_apps_tools(cache.tools))
}
fn write_cached_codex_apps_tools(cache_context: &CodexAppsToolsCacheContext, tools: &[ToolInfo]) {
let cache_path = cache_context.cache_path();
if let Some(parent) = cache_path.parent()
&& std::fs::create_dir_all(parent).is_err()
{
return;
}
let tools = filter_disallowed_codex_apps_tools(tools.to_vec());
let Ok(bytes) = serde_json::to_vec_pretty(&CodexAppsToolsDiskCache {
schema_version: CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION,
tools,
}) else {
return;
};
let _ = std::fs::write(cache_path, bytes);
}
fn filter_disallowed_codex_apps_tools(tools: Vec<ToolInfo>) -> Vec<ToolInfo> {
tools
.into_iter()
.filter(|tool| {
tool.connector_id
.as_deref()
.is_none_or(is_connector_id_allowed)
})
.collect()
}
fn emit_duration(metric: &str, duration: Duration, tags: &[(&str, &str)]) {
@ -1267,7 +1454,7 @@ async fn list_tools_for_client_uncached(
timeout: Option<Duration>,
) -> Result<Vec<ToolInfo>> {
let resp = client.list_tools_with_connector_ids(None, timeout).await?;
Ok(resp
let tools = resp
.tools
.into_iter()
.map(|tool| {
@ -1288,7 +1475,11 @@ async fn list_tools_for_client_uncached(
connector_name,
}
})
.collect())
.collect();
if server_name == CODEX_APPS_MCP_SERVER_NAME {
return Ok(filter_disallowed_codex_apps_tools(tools));
}
Ok(tools)
}
fn validate_mcp_server_name(server_name: &str) -> Result<()> {
@ -1376,6 +1567,7 @@ mod tests {
use rmcp::model::JsonObject;
use std::collections::HashSet;
use std::sync::Arc;
use tempfile::tempdir;
fn create_test_tool(server_name: &str, tool_name: &str) -> ToolInfo {
ToolInfo {
@ -1397,19 +1589,31 @@ mod tests {
}
}
fn with_clean_codex_apps_tools_cache<T>(f: impl FnOnce() -> T) -> T {
let previous_cache = {
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
cache_guard.take()
};
let result = f();
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*cache_guard = previous_cache;
result
fn create_test_tool_with_connector(
server_name: &str,
tool_name: &str,
connector_id: &str,
connector_name: Option<&str>,
) -> ToolInfo {
let mut tool = create_test_tool(server_name, tool_name);
tool.connector_id = Some(connector_id.to_string());
tool.connector_name = connector_name.map(ToOwned::to_owned);
tool
}
fn create_codex_apps_tools_cache_context(
codex_home: PathBuf,
account_id: Option<&str>,
chatgpt_user_id: Option<&str>,
) -> CodexAppsToolsCacheContext {
CodexAppsToolsCacheContext {
codex_home,
user_key: CodexAppsToolsCacheKey {
account_id: account_id.map(ToOwned::to_owned),
chatgpt_user_id: chatgpt_user_id.map(ToOwned::to_owned),
is_workspace_account: false,
},
}
}
#[test]
@ -1586,9 +1790,9 @@ mod tests {
disabled: HashSet::from(["tool_a".to_string()]),
};
let filtered: Vec<_> = filter_tools(server1_tools, server1_filter)
let filtered: Vec<_> = filter_tools(server1_tools, &server1_filter)
.into_iter()
.chain(filter_tools(server2_tools, server2_filter))
.chain(filter_tools(server2_tools, &server2_filter))
.collect();
assert_eq!(filtered.len(), 1);
@ -1598,43 +1802,260 @@ mod tests {
#[test]
fn codex_apps_tools_cache_is_overwritten_by_last_write() {
with_clean_codex_apps_tools_cache(|| {
let tools_gateway_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")];
let tools_gateway_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")];
let codex_home = tempdir().expect("tempdir");
let cache_context = create_codex_apps_tools_cache_context(
codex_home.path().to_path_buf(),
Some("account-one"),
Some("user-one"),
);
let tools_gateway_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")];
let tools_gateway_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")];
write_cached_codex_apps_tools(&tools_gateway_1);
let cached_gateway_1 =
read_cached_codex_apps_tools().expect("cache entry exists for first write");
assert_eq!(cached_gateway_1[0].tool_name, "one");
write_cached_codex_apps_tools(&cache_context, &tools_gateway_1);
let cached_gateway_1 = read_cached_codex_apps_tools(&cache_context)
.expect("cache entry exists for first write");
assert_eq!(cached_gateway_1[0].tool_name, "one");
write_cached_codex_apps_tools(&tools_gateway_2);
let cached_gateway_2 =
read_cached_codex_apps_tools().expect("cache entry exists for second write");
assert_eq!(cached_gateway_2[0].tool_name, "two");
});
write_cached_codex_apps_tools(&cache_context, &tools_gateway_2);
let cached_gateway_2 = read_cached_codex_apps_tools(&cache_context)
.expect("cache entry exists for second write");
assert_eq!(cached_gateway_2[0].tool_name, "two");
}
#[test]
fn codex_apps_tools_cache_is_cleared_when_expired() {
with_clean_codex_apps_tools_cache(|| {
let tools = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "stale_tool")];
write_cached_codex_apps_tools(&tools);
fn codex_apps_tools_cache_is_scoped_per_user() {
let codex_home = tempdir().expect("tempdir");
let cache_context_user_1 = create_codex_apps_tools_cache_context(
codex_home.path().to_path_buf(),
Some("account-one"),
Some("user-one"),
);
let cache_context_user_2 = create_codex_apps_tools_cache_context(
codex_home.path().to_path_buf(),
Some("account-two"),
Some("user-two"),
);
let tools_user_1 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")];
let tools_user_2 = vec![create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "two")];
{
let mut cache_guard = CODEX_APPS_TOOLS_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
cache_guard.as_mut().expect("cache exists").expires_at =
Instant::now() - Duration::from_secs(1);
}
write_cached_codex_apps_tools(&cache_context_user_1, &tools_user_1);
write_cached_codex_apps_tools(&cache_context_user_2, &tools_user_2);
assert!(read_cached_codex_apps_tools().is_none());
let read_user_1 =
read_cached_codex_apps_tools(&cache_context_user_1).expect("cache entry for user one");
let read_user_2 =
read_cached_codex_apps_tools(&cache_context_user_2).expect("cache entry for user two");
let cache_guard = CODEX_APPS_TOOLS_CACHE
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
assert!(cache_guard.is_none());
});
assert_eq!(read_user_1[0].tool_name, "one");
assert_eq!(read_user_2[0].tool_name, "two");
assert_ne!(
cache_context_user_1.cache_path(),
cache_context_user_2.cache_path(),
"each user should get an isolated cache file"
);
}
#[test]
fn codex_apps_tools_cache_filters_disallowed_connectors() {
let codex_home = tempdir().expect("tempdir");
let cache_context = create_codex_apps_tools_cache_context(
codex_home.path().to_path_buf(),
Some("account-one"),
Some("user-one"),
);
let tools = vec![
create_test_tool_with_connector(
CODEX_APPS_MCP_SERVER_NAME,
"blocked_tool",
"connector_openai_hidden",
Some("Hidden"),
),
create_test_tool_with_connector(
CODEX_APPS_MCP_SERVER_NAME,
"allowed_tool",
"calendar",
Some("Calendar"),
),
];
write_cached_codex_apps_tools(&cache_context, &tools);
let cached =
read_cached_codex_apps_tools(&cache_context).expect("cache entry exists for user");
assert_eq!(cached.len(), 1);
assert_eq!(cached[0].tool_name, "allowed_tool");
assert_eq!(cached[0].connector_id.as_deref(), Some("calendar"));
}
#[test]
fn codex_apps_tools_cache_is_ignored_when_schema_version_mismatches() {
let codex_home = tempdir().expect("tempdir");
let cache_context = create_codex_apps_tools_cache_context(
codex_home.path().to_path_buf(),
Some("account-one"),
Some("user-one"),
);
let cache_path = cache_context.cache_path();
if let Some(parent) = cache_path.parent() {
std::fs::create_dir_all(parent).expect("create parent");
}
let bytes = serde_json::to_vec_pretty(&serde_json::json!({
"schema_version": CODEX_APPS_TOOLS_CACHE_SCHEMA_VERSION + 1,
"tools": [create_test_tool(CODEX_APPS_MCP_SERVER_NAME, "one")],
}))
.expect("serialize");
std::fs::write(cache_path, bytes).expect("write");
assert!(read_cached_codex_apps_tools(&cache_context).is_none());
}
#[test]
fn codex_apps_tools_cache_is_ignored_when_json_is_invalid() {
let codex_home = tempdir().expect("tempdir");
let cache_context = create_codex_apps_tools_cache_context(
codex_home.path().to_path_buf(),
Some("account-one"),
Some("user-one"),
);
let cache_path = cache_context.cache_path();
if let Some(parent) = cache_path.parent() {
std::fs::create_dir_all(parent).expect("create parent");
}
std::fs::write(cache_path, b"{not json").expect("write");
assert!(read_cached_codex_apps_tools(&cache_context).is_none());
}
#[test]
fn startup_cached_codex_apps_tools_loads_from_disk_cache() {
let codex_home = tempdir().expect("tempdir");
let cache_context = create_codex_apps_tools_cache_context(
codex_home.path().to_path_buf(),
Some("account-one"),
Some("user-one"),
);
let cached_tools = vec![create_test_tool(
CODEX_APPS_MCP_SERVER_NAME,
"calendar_search",
)];
write_cached_codex_apps_tools(&cache_context, &cached_tools);
let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot(
CODEX_APPS_MCP_SERVER_NAME,
Some(&cache_context),
);
let startup_tools = startup_snapshot.expect("expected startup snapshot to load from cache");
assert_eq!(startup_tools.len(), 1);
assert_eq!(startup_tools[0].server_name, CODEX_APPS_MCP_SERVER_NAME);
assert_eq!(startup_tools[0].tool_name, "calendar_search");
}
#[tokio::test]
async fn list_all_tools_uses_startup_snapshot_while_client_is_pending() {
let startup_tools = vec![create_test_tool(
CODEX_APPS_MCP_SERVER_NAME,
"calendar_create_event",
)];
let pending_client =
futures::future::pending::<Result<ManagedClient, StartupOutcomeError>>()
.boxed()
.shared();
let approval_policy = Constrained::allow_any(AskForApproval::OnFailure);
let mut manager = McpConnectionManager::new_uninitialized(&approval_policy);
manager.clients.insert(
CODEX_APPS_MCP_SERVER_NAME.to_string(),
AsyncManagedClient {
client: pending_client,
startup_snapshot: Some(startup_tools),
startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)),
},
);
let tools = manager.list_all_tools().await;
let tool = tools
.get("mcp__codex_apps__calendar_create_event")
.expect("tool from startup cache");
assert_eq!(tool.server_name, CODEX_APPS_MCP_SERVER_NAME);
assert_eq!(tool.tool_name, "calendar_create_event");
}
#[tokio::test]
async fn list_all_tools_blocks_while_client_is_pending_without_startup_snapshot() {
let pending_client =
futures::future::pending::<Result<ManagedClient, StartupOutcomeError>>()
.boxed()
.shared();
let approval_policy = Constrained::allow_any(AskForApproval::OnFailure);
let mut manager = McpConnectionManager::new_uninitialized(&approval_policy);
manager.clients.insert(
CODEX_APPS_MCP_SERVER_NAME.to_string(),
AsyncManagedClient {
client: pending_client,
startup_snapshot: None,
startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)),
},
);
let timeout_result =
tokio::time::timeout(Duration::from_millis(10), manager.list_all_tools()).await;
assert!(timeout_result.is_err());
}
#[tokio::test]
async fn list_all_tools_does_not_block_when_startup_snapshot_cache_hit_is_empty() {
let pending_client =
futures::future::pending::<Result<ManagedClient, StartupOutcomeError>>()
.boxed()
.shared();
let approval_policy = Constrained::allow_any(AskForApproval::OnFailure);
let mut manager = McpConnectionManager::new_uninitialized(&approval_policy);
manager.clients.insert(
CODEX_APPS_MCP_SERVER_NAME.to_string(),
AsyncManagedClient {
client: pending_client,
startup_snapshot: Some(Vec::new()),
startup_complete: Arc::new(std::sync::atomic::AtomicBool::new(false)),
},
);
let timeout_result =
tokio::time::timeout(Duration::from_millis(10), manager.list_all_tools()).await;
let tools = timeout_result.expect("cache-hit startup snapshot should not block");
assert!(tools.is_empty());
}
#[tokio::test]
async fn list_all_tools_uses_startup_snapshot_when_client_startup_fails() {
let startup_tools = vec![create_test_tool(
CODEX_APPS_MCP_SERVER_NAME,
"calendar_create_event",
)];
let failed_client = futures::future::ready::<Result<ManagedClient, StartupOutcomeError>>(
Err(StartupOutcomeError::Failed {
error: "startup failed".to_string(),
}),
)
.boxed()
.shared();
let approval_policy = Constrained::allow_any(AskForApproval::OnFailure);
let mut manager = McpConnectionManager::new_uninitialized(&approval_policy);
let startup_complete = Arc::new(std::sync::atomic::AtomicBool::new(true));
manager.clients.insert(
CODEX_APPS_MCP_SERVER_NAME.to_string(),
AsyncManagedClient {
client: failed_client,
startup_snapshot: Some(startup_tools),
startup_complete,
},
);
let tools = manager.list_all_tools().await;
let tool = tools
.get("mcp__codex_apps__calendar_create_event")
.expect("tool from startup cache");
assert_eq!(tool.server_name, CODEX_APPS_MCP_SERVER_NAME);
assert_eq!(tool.tool_name, "calendar_create_event");
}
#[test]

View file

@ -539,6 +539,7 @@ pub(crate) struct ChatWidget {
mcp_startup_status: Option<HashMap<String, McpStartupStatus>>,
connectors_cache: ConnectorsCacheState,
connectors_prefetch_in_flight: bool,
connectors_force_refetch_pending: bool,
// Queue of interruptive UI events deferred during an active write cycle
interrupts: InterruptManager,
// Accumulates the current reasoning block text to extract a header
@ -2650,6 +2651,7 @@ impl ChatWidget {
mcp_startup_status: None,
connectors_cache: ConnectorsCacheState::default(),
connectors_prefetch_in_flight: false,
connectors_force_refetch_pending: false,
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
full_reasoning_buffer: String::new(),
@ -2813,6 +2815,7 @@ impl ChatWidget {
mcp_startup_status: None,
connectors_cache: ConnectorsCacheState::default(),
connectors_prefetch_in_flight: false,
connectors_force_refetch_pending: false,
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
full_reasoning_buffer: String::new(),
@ -2965,6 +2968,7 @@ impl ChatWidget {
mcp_startup_status: None,
connectors_cache: ConnectorsCacheState::default(),
connectors_prefetch_in_flight: false,
connectors_force_refetch_pending: false,
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
full_reasoning_buffer: String::new(),
@ -4601,7 +4605,13 @@ impl ChatWidget {
}
fn prefetch_connectors_with_options(&mut self, force_refetch: bool) {
if !self.connectors_enabled() || self.connectors_prefetch_in_flight {
if !self.connectors_enabled() {
return;
}
if self.connectors_prefetch_in_flight {
if force_refetch {
self.connectors_force_refetch_pending = true;
}
return;
}
@ -4613,8 +4623,8 @@ impl ChatWidget {
let config = self.config.clone();
let app_event_tx = self.app_event_tx.clone();
tokio::spawn(async move {
let accessible_connectors =
match connectors::list_accessible_connectors_from_mcp_tools_with_options(
let accessible_result =
match connectors::list_accessible_connectors_from_mcp_tools_with_options_and_status(
&config,
force_refetch,
)
@ -4629,6 +4639,9 @@ impl ChatWidget {
return;
}
};
let should_schedule_force_refetch =
!force_refetch && !accessible_result.codex_apps_ready;
let accessible_connectors = accessible_result.connectors;
app_event_tx.send(AppEvent::ConnectorsLoaded {
result: Ok(ConnectorsSnapshot {
@ -4638,7 +4651,8 @@ impl ChatWidget {
});
let result: Result<ConnectorsSnapshot, String> = async {
let all_connectors = connectors::list_all_connectors(&config).await?;
let all_connectors =
connectors::list_all_connectors_with_options(&config, force_refetch).await?;
let connectors = connectors::merge_connectors_with_accessible(
all_connectors,
accessible_connectors,
@ -4653,6 +4667,12 @@ impl ChatWidget {
result,
is_final: true,
});
if should_schedule_force_refetch {
app_event_tx.send(AppEvent::RefreshConnectors {
force_refetch: true,
});
}
});
}
@ -6821,8 +6841,13 @@ impl ChatWidget {
result: Result<ConnectorsSnapshot, String>,
is_final: bool,
) {
let mut trigger_pending_force_refetch = false;
if is_final {
self.connectors_prefetch_in_flight = false;
if self.connectors_force_refetch_pending {
self.connectors_force_refetch_pending = false;
trigger_pending_force_refetch = true;
}
}
match result {
@ -6857,13 +6882,16 @@ impl ChatWidget {
Err(err) => {
if matches!(self.connectors_cache, ConnectorsCacheState::Ready(_)) {
warn!("failed to refresh apps list; retaining current apps snapshot: {err}");
return;
} else {
self.connectors_cache = ConnectorsCacheState::Failed(err);
self.bottom_pane.set_connectors_snapshot(None);
}
self.connectors_cache = ConnectorsCacheState::Failed(err);
self.bottom_pane.set_connectors_snapshot(None);
}
}
if trigger_pending_force_refetch {
self.prefetch_connectors_with_options(true);
}
}
pub(crate) fn update_connector_enabled(&mut self, connector_id: &str, enabled: bool) {

View file

@ -1627,6 +1627,7 @@ async fn make_chatwidget_manual(
mcp_startup_status: None,
connectors_cache: ConnectorsCacheState::default(),
connectors_prefetch_in_flight: false,
connectors_force_refetch_pending: false,
interrupts: InterruptManager::new(),
reasoning_buffer: String::new(),
full_reasoning_buffer: String::new(),
@ -4582,6 +4583,42 @@ async fn apps_refresh_failure_keeps_existing_full_snapshot() {
);
}
#[tokio::test]
async fn apps_refresh_failure_with_cached_snapshot_triggers_pending_force_refetch() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;
chat.config.features.enable(Feature::Apps);
chat.bottom_pane.set_connectors_enabled(true);
chat.connectors_prefetch_in_flight = true;
chat.connectors_force_refetch_pending = true;
let full_connectors = vec![codex_chatgpt::connectors::AppInfo {
id: "unit_test_apps_refresh_failure_pending_connector".to_string(),
name: "Notion".to_string(),
description: Some("Workspace docs".to_string()),
logo_url: None,
logo_url_dark: None,
distribution_channel: None,
branding: None,
app_metadata: None,
labels: None,
install_url: Some("https://example.test/notion".to_string()),
is_accessible: true,
is_enabled: true,
}];
chat.connectors_cache = ConnectorsCacheState::Ready(ConnectorsSnapshot {
connectors: full_connectors.clone(),
});
chat.on_connectors_loaded(Err("failed to load apps".to_string()), true);
assert!(chat.connectors_prefetch_in_flight);
assert!(!chat.connectors_force_refetch_pending);
assert_matches!(
&chat.connectors_cache,
ConnectorsCacheState::Ready(snapshot) if snapshot.connectors == full_connectors
);
}
#[tokio::test]
async fn apps_partial_refresh_uses_same_filtering_as_full_refresh() {
let (mut chat, _rx, _op_rx) = make_chatwidget_manual(None).await;