From 7cd2e840262ff2abeaafe0c836a1834f3f8aa55b Mon Sep 17 00:00:00 2001 From: Michael Bolin Date: Thu, 19 Feb 2026 10:59:51 -0800 Subject: [PATCH] chore: consolidate new() and initialize() for McpConnectionManager (#12255) ## Why `McpConnectionManager` used a two-phase setup (`new()` followed by `initialize()`), which forced call sites to construct placeholder state and then mutate it asynchronously. That made MCP startup/refresh flows harder to follow and easier to misuse, especially around cancellation token ownership. ## What changed - Replaced the two-phase initialization flow with a single async constructor: `McpConnectionManager::new(...) -> (Self, CancellationToken)`. - Added `McpConnectionManager::new_uninitialized()` for places that need an empty manager before async startup begins. - Added `McpConnectionManager::new_mcp_connection_manager_for_tests()` for test-only construction. - Updated MCP startup and refresh call sites in `codex-rs/core/src/codex.rs` to build a fresh manager via `new(...)`, swap it in, and update the startup cancellation token consistently. - Updated MCP snapshot/connector call sites in `codex-rs/core/src/mcp/mod.rs` and `codex-rs/core/src/connectors.rs` to use the consolidated constructor. - Removed the now-obsolete `reset_mcp_startup_cancellation_token()` helper in favor of explicit token replacement at the call sites. ## Testing - Not run (refactor-only change; no new behavior was intended). --- codex-rs/core/src/codex.rs | 102 ++++++++++++-------- codex-rs/core/src/connectors.rs | 21 ++-- codex-rs/core/src/mcp/mod.rs | 21 ++-- codex-rs/core/src/mcp_connection_manager.rs | 30 ++++-- 4 files changed, 99 insertions(+), 75 deletions(-) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 0ff60c464..21aad3c75 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1281,7 +1281,16 @@ impl Session { .await; let services = SessionServices { - mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), + // Initialize the MCP connection manager with an uninitialized + // instance. It will be replaced with one created via + // McpConnectionManager::new() once all its constructor args are + // available. This also ensures `SessionConfigured` is emitted + // before any MCP-related events. It is reasonable to consider + // changing this to use Option or OnceCell, though the current + // setup is straightforward enough and performs well. + mcp_connection_manager: Arc::new( + RwLock::new(McpConnectionManager::new_uninitialized()), + ), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -1385,7 +1394,7 @@ impl Session { // Start the watcher after SessionConfigured so it cannot emit earlier events. sess.start_file_watcher_listener(); - // Construct sandbox_state before initialize() so it can be sent to each + // Construct sandbox_state before MCP startup so it can be sent to each // MCP server immediately after it becomes ready (avoiding blocking). let sandbox_state = SandboxState { sandbox_policy: session_configuration.sandbox_policy.get().clone(), @@ -1399,21 +1408,30 @@ impl Session { .map(|(name, _)| name.clone()) .collect(); required_mcp_servers.sort(); - let cancel_token = sess.mcp_startup_cancellation_token().await; - - sess.services - .mcp_connection_manager - .write() - .await - .initialize( - &mcp_servers, - config.mcp_oauth_credentials_store_mode, - auth_statuses.clone(), - tx_event.clone(), - cancel_token, - sandbox_state, - ) - .await; + { + let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await; + cancel_guard.cancel(); + *cancel_guard = CancellationToken::new(); + } + let (mcp_connection_manager, cancel_token) = McpConnectionManager::new( + &mcp_servers, + config.mcp_oauth_credentials_store_mode, + auth_statuses.clone(), + tx_event.clone(), + sandbox_state, + ) + .await; + { + let mut manager_guard = sess.services.mcp_connection_manager.write().await; + *manager_guard = mcp_connection_manager; + } + { + let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await; + if cancel_guard.is_cancelled() { + cancel_token.cancel(); + } + *cancel_guard = cancel_token; + } if !required_mcp_servers.is_empty() { let failures = sess .services @@ -3032,19 +3050,26 @@ impl Session { sandbox_cwd: turn_context.cwd.clone(), use_linux_sandbox_bwrap: turn_context.features.enabled(Feature::UseLinuxSandboxBwrap), }; - let cancel_token = self.reset_mcp_startup_cancellation_token().await; - - let mut refreshed_manager = McpConnectionManager::default(); - refreshed_manager - .initialize( - &mcp_servers, - store_mode, - auth_statuses, - self.get_tx_event(), - cancel_token, - sandbox_state, - ) - .await; + { + let mut cancel_guard = self.services.mcp_startup_cancellation_token.lock().await; + cancel_guard.cancel(); + *cancel_guard = CancellationToken::new(); + } + let (refreshed_manager, cancel_token) = McpConnectionManager::new( + &mcp_servers, + store_mode, + auth_statuses, + self.get_tx_event(), + sandbox_state, + ) + .await; + { + let mut cancel_guard = self.services.mcp_startup_cancellation_token.lock().await; + if cancel_guard.is_cancelled() { + cancel_token.cancel(); + } + *cancel_guard = cancel_token; + } let mut manager = self.services.mcp_connection_manager.write().await; *manager = refreshed_manager; @@ -3093,6 +3118,7 @@ impl Session { .await; } + #[cfg(test)] async fn mcp_startup_cancellation_token(&self) -> CancellationToken { self.services .mcp_startup_cancellation_token @@ -3101,14 +3127,6 @@ impl Session { .clone() } - async fn reset_mcp_startup_cancellation_token(&self) -> CancellationToken { - let mut guard = self.services.mcp_startup_cancellation_token.lock().await; - guard.cancel(); - let cancel_token = CancellationToken::new(); - *guard = cancel_token.clone(); - cancel_token - } - fn show_raw_agent_reasoning(&self) -> bool { self.services.show_raw_agent_reasoning } @@ -7364,7 +7382,9 @@ mod tests { let file_watcher = Arc::new(FileWatcher::noop()); let services = SessionServices { - mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), + mcp_connection_manager: Arc::new(RwLock::new( + McpConnectionManager::new_mcp_connection_manager_for_tests(), + )), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, @@ -7516,7 +7536,9 @@ mod tests { let file_watcher = Arc::new(FileWatcher::noop()); let services = SessionServices { - mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), + mcp_connection_manager: Arc::new(RwLock::new( + McpConnectionManager::new_mcp_connection_manager_for_tests(), + )), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), unified_exec_manager: UnifiedExecProcessManager::new( config.background_terminal_max_timeout, diff --git a/codex-rs/core/src/connectors.rs b/codex-rs/core/src/connectors.rs index 5a8930231..ebad8cd3b 100644 --- a/codex-rs/core/src/connectors.rs +++ b/codex-rs/core/src/connectors.rs @@ -12,7 +12,6 @@ pub use codex_app_server_protocol::AppInfo; pub use codex_app_server_protocol::AppMetadata; use codex_protocol::protocol::SandboxPolicy; use serde::Deserialize; -use tokio_util::sync::CancellationToken; use tracing::warn; use crate::AuthManager; @@ -91,10 +90,8 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options( let auth_status_entries = compute_auth_statuses(mcp_servers.iter(), config.mcp_oauth_credentials_store_mode).await; - let mut mcp_connection_manager = McpConnectionManager::default(); let (tx_event, rx_event) = unbounded(); drop(rx_event); - let cancel_token = CancellationToken::new(); let sandbox_state = SandboxState { sandbox_policy: SandboxPolicy::new_read_only_policy(), @@ -103,16 +100,14 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options( use_linux_sandbox_bwrap: config.features.enabled(Feature::UseLinuxSandboxBwrap), }; - mcp_connection_manager - .initialize( - &mcp_servers, - config.mcp_oauth_credentials_store_mode, - auth_status_entries, - tx_event, - cancel_token.clone(), - sandbox_state, - ) - .await; + let (mcp_connection_manager, cancel_token) = McpConnectionManager::new( + &mcp_servers, + config.mcp_oauth_credentials_store_mode, + auth_status_entries, + tx_event, + sandbox_state, + ) + .await; if force_refetch && let Err(err) = mcp_connection_manager diff --git a/codex-rs/core/src/mcp/mod.rs b/codex-rs/core/src/mcp/mod.rs index 1365b5da8..84b979499 100644 --- a/codex-rs/core/src/mcp/mod.rs +++ b/codex-rs/core/src/mcp/mod.rs @@ -14,7 +14,6 @@ use codex_protocol::mcp::Tool; use codex_protocol::protocol::McpListToolsResponseEvent; use codex_protocol::protocol::SandboxPolicy; use serde_json::Value; -use tokio_util::sync::CancellationToken; use crate::AuthManager; use crate::CodexAuth; @@ -191,10 +190,8 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent let auth_status_entries = compute_auth_statuses(mcp_servers.iter(), config.mcp_oauth_credentials_store_mode).await; - let mut mcp_connection_manager = McpConnectionManager::default(); let (tx_event, rx_event) = unbounded(); drop(rx_event); - let cancel_token = CancellationToken::new(); // Use ReadOnly sandbox policy for MCP snapshot collection (safest default) let sandbox_state = SandboxState { @@ -204,16 +201,14 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent use_linux_sandbox_bwrap: config.features.enabled(Feature::UseLinuxSandboxBwrap), }; - mcp_connection_manager - .initialize( - &mcp_servers, - config.mcp_oauth_credentials_store_mode, - auth_status_entries.clone(), - tx_event, - cancel_token.clone(), - sandbox_state, - ) - .await; + let (mcp_connection_manager, cancel_token) = McpConnectionManager::new( + &mcp_servers, + config.mcp_oauth_credentials_store_mode, + auth_status_entries.clone(), + tx_event, + sandbox_state, + ) + .await; let snapshot = collect_mcp_snapshot_from_manager(&mcp_connection_manager, auth_status_entries).await; diff --git a/codex-rs/core/src/mcp_connection_manager.rs b/codex-rs/core/src/mcp_connection_manager.rs index 62c4dedd4..0fe2ac8a8 100644 --- a/codex-rs/core/src/mcp_connection_manager.rs +++ b/codex-rs/core/src/mcp_connection_manager.rs @@ -353,22 +353,31 @@ pub(crate) struct McpConnectionManager { } impl McpConnectionManager { + pub(crate) fn new_uninitialized() -> Self { + Self { + clients: HashMap::new(), + elicitation_requests: ElicitationRequestManager::default(), + } + } + + #[cfg(test)] + pub(crate) fn new_mcp_connection_manager_for_tests() -> Self { + Self::new_uninitialized() + } + pub(crate) fn has_servers(&self) -> bool { !self.clients.is_empty() } - pub async fn initialize( - &mut self, + #[allow(clippy::new_ret_no_self)] + pub async fn new( mcp_servers: &HashMap, store_mode: OAuthCredentialsStoreMode, auth_entries: HashMap, tx_event: Sender, - cancel_token: CancellationToken, initial_sandbox_state: SandboxState, - ) { - if cancel_token.is_cancelled() { - return; - } + ) -> (Self, CancellationToken) { + let cancel_token = CancellationToken::new(); let mut clients = HashMap::new(); let mut join_set = JoinSet::new(); let elicitation_requests = ElicitationRequestManager::default(); @@ -435,8 +444,10 @@ impl McpConnectionManager { (server_name, outcome) }); } - self.clients = clients; - self.elicitation_requests = elicitation_requests.clone(); + let manager = Self { + clients, + elicitation_requests: elicitation_requests.clone(), + }; tokio::spawn(async move { let outcomes = join_set.join_all().await; let mut summary = McpStartupCompleteEvent::default(); @@ -459,6 +470,7 @@ impl McpConnectionManager { }) .await; }); + (manager, cancel_token) } async fn client_by_name(&self, name: &str) -> Result {