chore: consolidate new() and initialize() for McpConnectionManager (#12255)
## Why `McpConnectionManager` used a two-phase setup (`new()` followed by `initialize()`), which forced call sites to construct placeholder state and then mutate it asynchronously. That made MCP startup/refresh flows harder to follow and easier to misuse, especially around cancellation token ownership. ## What changed - Replaced the two-phase initialization flow with a single async constructor: `McpConnectionManager::new(...) -> (Self, CancellationToken)`. - Added `McpConnectionManager::new_uninitialized()` for places that need an empty manager before async startup begins. - Added `McpConnectionManager::new_mcp_connection_manager_for_tests()` for test-only construction. - Updated MCP startup and refresh call sites in `codex-rs/core/src/codex.rs` to build a fresh manager via `new(...)`, swap it in, and update the startup cancellation token consistently. - Updated MCP snapshot/connector call sites in `codex-rs/core/src/mcp/mod.rs` and `codex-rs/core/src/connectors.rs` to use the consolidated constructor. - Removed the now-obsolete `reset_mcp_startup_cancellation_token()` helper in favor of explicit token replacement at the call sites. ## Testing - Not run (refactor-only change; no new behavior was intended).
This commit is contained in:
parent
9719dc502c
commit
7cd2e84026
4 changed files with 99 additions and 75 deletions
|
|
@ -1281,7 +1281,16 @@ impl Session {
|
|||
.await;
|
||||
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
|
||||
// Initialize the MCP connection manager with an uninitialized
|
||||
// instance. It will be replaced with one created via
|
||||
// McpConnectionManager::new() once all its constructor args are
|
||||
// available. This also ensures `SessionConfigured` is emitted
|
||||
// before any MCP-related events. It is reasonable to consider
|
||||
// changing this to use Option or OnceCell, though the current
|
||||
// setup is straightforward enough and performs well.
|
||||
mcp_connection_manager: Arc::new(
|
||||
RwLock::new(McpConnectionManager::new_uninitialized()),
|
||||
),
|
||||
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
|
||||
unified_exec_manager: UnifiedExecProcessManager::new(
|
||||
config.background_terminal_max_timeout,
|
||||
|
|
@ -1385,7 +1394,7 @@ impl Session {
|
|||
// Start the watcher after SessionConfigured so it cannot emit earlier events.
|
||||
sess.start_file_watcher_listener();
|
||||
|
||||
// Construct sandbox_state before initialize() so it can be sent to each
|
||||
// Construct sandbox_state before MCP startup so it can be sent to each
|
||||
// MCP server immediately after it becomes ready (avoiding blocking).
|
||||
let sandbox_state = SandboxState {
|
||||
sandbox_policy: session_configuration.sandbox_policy.get().clone(),
|
||||
|
|
@ -1399,21 +1408,30 @@ impl Session {
|
|||
.map(|(name, _)| name.clone())
|
||||
.collect();
|
||||
required_mcp_servers.sort();
|
||||
let cancel_token = sess.mcp_startup_cancellation_token().await;
|
||||
|
||||
sess.services
|
||||
.mcp_connection_manager
|
||||
.write()
|
||||
.await
|
||||
.initialize(
|
||||
&mcp_servers,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_statuses.clone(),
|
||||
tx_event.clone(),
|
||||
cancel_token,
|
||||
sandbox_state,
|
||||
)
|
||||
.await;
|
||||
{
|
||||
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
|
||||
cancel_guard.cancel();
|
||||
*cancel_guard = CancellationToken::new();
|
||||
}
|
||||
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
|
||||
&mcp_servers,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_statuses.clone(),
|
||||
tx_event.clone(),
|
||||
sandbox_state,
|
||||
)
|
||||
.await;
|
||||
{
|
||||
let mut manager_guard = sess.services.mcp_connection_manager.write().await;
|
||||
*manager_guard = mcp_connection_manager;
|
||||
}
|
||||
{
|
||||
let mut cancel_guard = sess.services.mcp_startup_cancellation_token.lock().await;
|
||||
if cancel_guard.is_cancelled() {
|
||||
cancel_token.cancel();
|
||||
}
|
||||
*cancel_guard = cancel_token;
|
||||
}
|
||||
if !required_mcp_servers.is_empty() {
|
||||
let failures = sess
|
||||
.services
|
||||
|
|
@ -3032,19 +3050,26 @@ impl Session {
|
|||
sandbox_cwd: turn_context.cwd.clone(),
|
||||
use_linux_sandbox_bwrap: turn_context.features.enabled(Feature::UseLinuxSandboxBwrap),
|
||||
};
|
||||
let cancel_token = self.reset_mcp_startup_cancellation_token().await;
|
||||
|
||||
let mut refreshed_manager = McpConnectionManager::default();
|
||||
refreshed_manager
|
||||
.initialize(
|
||||
&mcp_servers,
|
||||
store_mode,
|
||||
auth_statuses,
|
||||
self.get_tx_event(),
|
||||
cancel_token,
|
||||
sandbox_state,
|
||||
)
|
||||
.await;
|
||||
{
|
||||
let mut cancel_guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
cancel_guard.cancel();
|
||||
*cancel_guard = CancellationToken::new();
|
||||
}
|
||||
let (refreshed_manager, cancel_token) = McpConnectionManager::new(
|
||||
&mcp_servers,
|
||||
store_mode,
|
||||
auth_statuses,
|
||||
self.get_tx_event(),
|
||||
sandbox_state,
|
||||
)
|
||||
.await;
|
||||
{
|
||||
let mut cancel_guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
if cancel_guard.is_cancelled() {
|
||||
cancel_token.cancel();
|
||||
}
|
||||
*cancel_guard = cancel_token;
|
||||
}
|
||||
|
||||
let mut manager = self.services.mcp_connection_manager.write().await;
|
||||
*manager = refreshed_manager;
|
||||
|
|
@ -3093,6 +3118,7 @@ impl Session {
|
|||
.await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
async fn mcp_startup_cancellation_token(&self) -> CancellationToken {
|
||||
self.services
|
||||
.mcp_startup_cancellation_token
|
||||
|
|
@ -3101,14 +3127,6 @@ impl Session {
|
|||
.clone()
|
||||
}
|
||||
|
||||
async fn reset_mcp_startup_cancellation_token(&self) -> CancellationToken {
|
||||
let mut guard = self.services.mcp_startup_cancellation_token.lock().await;
|
||||
guard.cancel();
|
||||
let cancel_token = CancellationToken::new();
|
||||
*guard = cancel_token.clone();
|
||||
cancel_token
|
||||
}
|
||||
|
||||
fn show_raw_agent_reasoning(&self) -> bool {
|
||||
self.services.show_raw_agent_reasoning
|
||||
}
|
||||
|
|
@ -7364,7 +7382,9 @@ mod tests {
|
|||
|
||||
let file_watcher = Arc::new(FileWatcher::noop());
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
|
||||
mcp_connection_manager: Arc::new(RwLock::new(
|
||||
McpConnectionManager::new_mcp_connection_manager_for_tests(),
|
||||
)),
|
||||
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
|
||||
unified_exec_manager: UnifiedExecProcessManager::new(
|
||||
config.background_terminal_max_timeout,
|
||||
|
|
@ -7516,7 +7536,9 @@ mod tests {
|
|||
|
||||
let file_watcher = Arc::new(FileWatcher::noop());
|
||||
let services = SessionServices {
|
||||
mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())),
|
||||
mcp_connection_manager: Arc::new(RwLock::new(
|
||||
McpConnectionManager::new_mcp_connection_manager_for_tests(),
|
||||
)),
|
||||
mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()),
|
||||
unified_exec_manager: UnifiedExecProcessManager::new(
|
||||
config.background_terminal_max_timeout,
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ pub use codex_app_server_protocol::AppInfo;
|
|||
pub use codex_app_server_protocol::AppMetadata;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use serde::Deserialize;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tracing::warn;
|
||||
|
||||
use crate::AuthManager;
|
||||
|
|
@ -91,10 +90,8 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options(
|
|||
let auth_status_entries =
|
||||
compute_auth_statuses(mcp_servers.iter(), config.mcp_oauth_credentials_store_mode).await;
|
||||
|
||||
let mut mcp_connection_manager = McpConnectionManager::default();
|
||||
let (tx_event, rx_event) = unbounded();
|
||||
drop(rx_event);
|
||||
let cancel_token = CancellationToken::new();
|
||||
|
||||
let sandbox_state = SandboxState {
|
||||
sandbox_policy: SandboxPolicy::new_read_only_policy(),
|
||||
|
|
@ -103,16 +100,14 @@ pub async fn list_accessible_connectors_from_mcp_tools_with_options(
|
|||
use_linux_sandbox_bwrap: config.features.enabled(Feature::UseLinuxSandboxBwrap),
|
||||
};
|
||||
|
||||
mcp_connection_manager
|
||||
.initialize(
|
||||
&mcp_servers,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_status_entries,
|
||||
tx_event,
|
||||
cancel_token.clone(),
|
||||
sandbox_state,
|
||||
)
|
||||
.await;
|
||||
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
|
||||
&mcp_servers,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_status_entries,
|
||||
tx_event,
|
||||
sandbox_state,
|
||||
)
|
||||
.await;
|
||||
|
||||
if force_refetch
|
||||
&& let Err(err) = mcp_connection_manager
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ use codex_protocol::mcp::Tool;
|
|||
use codex_protocol::protocol::McpListToolsResponseEvent;
|
||||
use codex_protocol::protocol::SandboxPolicy;
|
||||
use serde_json::Value;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::AuthManager;
|
||||
use crate::CodexAuth;
|
||||
|
|
@ -191,10 +190,8 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent
|
|||
let auth_status_entries =
|
||||
compute_auth_statuses(mcp_servers.iter(), config.mcp_oauth_credentials_store_mode).await;
|
||||
|
||||
let mut mcp_connection_manager = McpConnectionManager::default();
|
||||
let (tx_event, rx_event) = unbounded();
|
||||
drop(rx_event);
|
||||
let cancel_token = CancellationToken::new();
|
||||
|
||||
// Use ReadOnly sandbox policy for MCP snapshot collection (safest default)
|
||||
let sandbox_state = SandboxState {
|
||||
|
|
@ -204,16 +201,14 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent
|
|||
use_linux_sandbox_bwrap: config.features.enabled(Feature::UseLinuxSandboxBwrap),
|
||||
};
|
||||
|
||||
mcp_connection_manager
|
||||
.initialize(
|
||||
&mcp_servers,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_status_entries.clone(),
|
||||
tx_event,
|
||||
cancel_token.clone(),
|
||||
sandbox_state,
|
||||
)
|
||||
.await;
|
||||
let (mcp_connection_manager, cancel_token) = McpConnectionManager::new(
|
||||
&mcp_servers,
|
||||
config.mcp_oauth_credentials_store_mode,
|
||||
auth_status_entries.clone(),
|
||||
tx_event,
|
||||
sandbox_state,
|
||||
)
|
||||
.await;
|
||||
|
||||
let snapshot =
|
||||
collect_mcp_snapshot_from_manager(&mcp_connection_manager, auth_status_entries).await;
|
||||
|
|
|
|||
|
|
@ -353,22 +353,31 @@ pub(crate) struct McpConnectionManager {
|
|||
}
|
||||
|
||||
impl McpConnectionManager {
|
||||
pub(crate) fn new_uninitialized() -> Self {
|
||||
Self {
|
||||
clients: HashMap::new(),
|
||||
elicitation_requests: ElicitationRequestManager::default(),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) fn new_mcp_connection_manager_for_tests() -> Self {
|
||||
Self::new_uninitialized()
|
||||
}
|
||||
|
||||
pub(crate) fn has_servers(&self) -> bool {
|
||||
!self.clients.is_empty()
|
||||
}
|
||||
|
||||
pub async fn initialize(
|
||||
&mut self,
|
||||
#[allow(clippy::new_ret_no_self)]
|
||||
pub async fn new(
|
||||
mcp_servers: &HashMap<String, McpServerConfig>,
|
||||
store_mode: OAuthCredentialsStoreMode,
|
||||
auth_entries: HashMap<String, McpAuthStatusEntry>,
|
||||
tx_event: Sender<Event>,
|
||||
cancel_token: CancellationToken,
|
||||
initial_sandbox_state: SandboxState,
|
||||
) {
|
||||
if cancel_token.is_cancelled() {
|
||||
return;
|
||||
}
|
||||
) -> (Self, CancellationToken) {
|
||||
let cancel_token = CancellationToken::new();
|
||||
let mut clients = HashMap::new();
|
||||
let mut join_set = JoinSet::new();
|
||||
let elicitation_requests = ElicitationRequestManager::default();
|
||||
|
|
@ -435,8 +444,10 @@ impl McpConnectionManager {
|
|||
(server_name, outcome)
|
||||
});
|
||||
}
|
||||
self.clients = clients;
|
||||
self.elicitation_requests = elicitation_requests.clone();
|
||||
let manager = Self {
|
||||
clients,
|
||||
elicitation_requests: elicitation_requests.clone(),
|
||||
};
|
||||
tokio::spawn(async move {
|
||||
let outcomes = join_set.join_all().await;
|
||||
let mut summary = McpStartupCompleteEvent::default();
|
||||
|
|
@ -459,6 +470,7 @@ impl McpConnectionManager {
|
|||
})
|
||||
.await;
|
||||
});
|
||||
(manager, cancel_token)
|
||||
}
|
||||
|
||||
async fn client_by_name(&self, name: &str) -> Result<ManagedClient> {
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue