From a2c829a808032b6f210f8c238dddc9815cca9c33 Mon Sep 17 00:00:00 2001 From: Matthew Zeng Date: Thu, 22 Jan 2026 16:48:43 -0800 Subject: [PATCH] [connectors] Support connectors part 1 - App server & MCP (#9667) In order to make Codex work with connectors, we add a built-in gateway MCP that acts as a transparent proxy between the client and the connectors. The gateway MCP collects actions that are accessible to the user and sends them down to the user, when a connector action is chosen to be called, the client invokes the action through the gateway MCP as well. - [x] Add the system built-in gateway MCP to list and run connectors. - [x] Add the app server methods and protocol --- codex-rs/Cargo.lock | 5 + .../src/protocol/common.rs | 4 + .../app-server-protocol/src/protocol/v2.rs | 33 ++ codex-rs/app-server/Cargo.toml | 10 + codex-rs/app-server/README.md | 1 + .../app-server/src/codex_message_processor.rs | 103 +++++ .../app-server/tests/common/auth_fixtures.rs | 34 +- .../app-server/tests/common/mcp_process.rs | 7 + .../app-server/tests/suite/v2/app_list.rs | 381 ++++++++++++++++++ codex-rs/app-server/tests/suite/v2/mod.rs | 1 + codex-rs/chatgpt/src/chatgpt_client.rs | 35 ++ codex-rs/chatgpt/src/connectors.rs | 125 ++++++ codex-rs/chatgpt/src/lib.rs | 1 + codex-rs/core/config.schema.json | 6 + codex-rs/core/src/auth.rs | 1 + codex-rs/core/src/codex.rs | 120 +++++- codex-rs/core/src/connectors.rs | 227 +++++++++++ codex-rs/core/src/features.rs | 8 + codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/mcp/mod.rs | 130 +++++- codex-rs/core/src/mcp_connection_manager.rs | 12 +- codex-rs/core/src/token_data.rs | 8 + codex-rs/rmcp-client/src/lib.rs | 2 + codex-rs/rmcp-client/src/rmcp_client.rs | 55 ++- docs/config.md | 6 + 25 files changed, 1288 insertions(+), 28 deletions(-) create mode 100644 codex-rs/app-server/tests/suite/v2/app_list.rs create mode 100644 codex-rs/chatgpt/src/connectors.rs create mode 100644 codex-rs/core/src/connectors.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index f97139523..c8fc0223f 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -625,6 +625,8 @@ dependencies = [ "pin-project-lite", "rustversion", "serde", + "serde_json", + "serde_path_to_error", "sync_wrapper", "tokio", "tower", @@ -1000,11 +1002,13 @@ version = "0.0.0" dependencies = [ "anyhow", "app_test_support", + "axum", "base64", "chrono", "codex-app-server-protocol", "codex-arg0", "codex-backend-client", + "codex-chatgpt", "codex-common", "codex-core", "codex-feedback", @@ -1018,6 +1022,7 @@ dependencies = [ "mcp-types", "os_info", "pretty_assertions", + "rmcp", "serde", "serde_json", "serial_test", diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index c6385b223..fcc35a60d 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -137,6 +137,10 @@ client_request_definitions! { params: v2::SkillsListParams, response: v2::SkillsListResponse, }, + AppsList => "app/list" { + params: v2::AppsListParams, + response: v2::AppsListResponse, + }, SkillsConfigWrite => "skills/config/write" { params: v2::SkillsConfigWriteParams, response: v2::SkillsConfigWriteResponse, diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 66654e051..022532a05 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -965,6 +965,39 @@ pub struct ListMcpServerStatusResponse { pub next_cursor: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct AppsListParams { + /// Opaque pagination cursor returned by a previous call. + pub cursor: Option, + /// Optional page size; defaults to a reasonable server-side value. + pub limit: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct AppInfo { + pub id: String, + pub name: String, + pub description: Option, + pub logo_url: Option, + pub install_url: Option, + #[serde(default)] + pub is_accessible: bool, +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct AppsListResponse { + pub data: Vec, + /// Opaque cursor to pass to the next call to continue after the last item. + /// If None, there are no more items to return. + pub next_cursor: Option, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server/Cargo.toml b/codex-rs/app-server/Cargo.toml index fbe9150a1..f31df68b3 100644 --- a/codex-rs/app-server/Cargo.toml +++ b/codex-rs/app-server/Cargo.toml @@ -22,6 +22,7 @@ codex-common = { workspace = true, features = ["cli"] } codex-core = { workspace = true } codex-backend-client = { workspace = true } codex-file-search = { workspace = true } +codex-chatgpt = { workspace = true } codex-login = { workspace = true } codex-protocol = { workspace = true } codex-app-server-protocol = { workspace = true } @@ -48,11 +49,20 @@ uuid = { workspace = true, features = ["serde", "v7"] } [dev-dependencies] app_test_support = { workspace = true } +axum = { workspace = true, default-features = false, features = [ + "http1", + "json", + "tokio", +] } base64 = { workspace = true } core_test_support = { workspace = true } mcp-types = { workspace = true } os_info = { workspace = true } pretty_assertions = { workspace = true } +rmcp = { workspace = true, default-features = false, features = [ + "server", + "transport-streamable-http-server", +] } serial_test = { workspace = true } wiremock = { workspace = true } shlex = { workspace = true } diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index aef925373..c990b1ac3 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -89,6 +89,7 @@ Example (from OpenAI's official VSCode extension): - `model/list` — list available models (with reasoning effort options). - `collaborationMode/list` — list available collaboration mode presets (experimental, no pagination). - `skills/list` — list skills for one or more `cwd` values (optional `forceReload`). +- `app/list` — list available apps. - `skills/config/write` — write user-level skill config by path. - `mcpServer/oauth/login` — start an OAuth login for a configured MCP server; returns an `authorization_url` and later emits `mcpServer/oauthLogin/completed` once the browser flow finishes. - `tool/requestUserInput` — prompt the user with 1–3 short questions for a tool call and return their answers (experimental). diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 36611d157..62fceb34d 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -13,6 +13,9 @@ use codex_app_server_protocol::AccountLoginCompletedNotification; use codex_app_server_protocol::AccountUpdatedNotification; use codex_app_server_protocol::AddConversationListenerParams; use codex_app_server_protocol::AddConversationSubscriptionResponse; +use codex_app_server_protocol::AppInfo as ApiAppInfo; +use codex_app_server_protocol::AppsListParams; +use codex_app_server_protocol::AppsListResponse; use codex_app_server_protocol::ArchiveConversationParams; use codex_app_server_protocol::ArchiveConversationResponse; use codex_app_server_protocol::AskForApproval; @@ -122,6 +125,7 @@ use codex_app_server_protocol::UserInput as V2UserInput; use codex_app_server_protocol::UserSavedConfig; use codex_app_server_protocol::build_turns_from_event_msgs; use codex_backend_client::Client as BackendClient; +use codex_chatgpt::connectors; use codex_core::AuthManager; use codex_core::CodexThread; use codex_core::Cursor as RolloutCursor; @@ -411,6 +415,9 @@ impl CodexMessageProcessor { ClientRequest::SkillsList { request_id, params } => { self.skills_list(request_id, params).await; } + ClientRequest::AppsList { request_id, params } => { + self.apps_list(request_id, params).await; + } ClientRequest::SkillsConfigWrite { request_id, params } => { self.skills_config_write(request_id, params).await; } @@ -3406,6 +3413,102 @@ impl CodexMessageProcessor { .await; } + async fn apps_list(&self, request_id: RequestId, params: AppsListParams) { + let AppsListParams { cursor, limit } = params; + let config = match self.load_latest_config().await { + Ok(config) => config, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + if !config.features.enabled(Feature::Connectors) { + self.outgoing + .send_response( + request_id, + AppsListResponse { + data: Vec::new(), + next_cursor: None, + }, + ) + .await; + return; + } + + let connectors = match connectors::list_connectors(&config).await { + Ok(connectors) => connectors, + Err(err) => { + self.send_internal_error(request_id, format!("failed to list apps: {err}")) + .await; + return; + } + }; + + let total = connectors.len(); + if total == 0 { + self.outgoing + .send_response( + request_id, + AppsListResponse { + data: Vec::new(), + next_cursor: None, + }, + ) + .await; + return; + } + + let effective_limit = limit.unwrap_or(total as u32).max(1) as usize; + let effective_limit = effective_limit.min(total); + let start = match cursor { + Some(cursor) => match cursor.parse::() { + Ok(idx) => idx, + Err(_) => { + self.send_invalid_request_error( + request_id, + format!("invalid cursor: {cursor}"), + ) + .await; + return; + } + }, + None => 0, + }; + + if start > total { + self.send_invalid_request_error( + request_id, + format!("cursor {start} exceeds total apps {total}"), + ) + .await; + return; + } + + let end = start.saturating_add(effective_limit).min(total); + let data = connectors[start..end] + .iter() + .cloned() + .map(|connector| ApiAppInfo { + id: connector.connector_id, + name: connector.connector_name, + description: connector.connector_description, + logo_url: connector.logo_url, + install_url: connector.install_url, + is_accessible: connector.is_accessible, + }) + .collect(); + + let next_cursor = if end < total { + Some(end.to_string()) + } else { + None + }; + self.outgoing + .send_response(request_id, AppsListResponse { data, next_cursor }) + .await; + } + async fn skills_list(&self, request_id: RequestId, params: SkillsListParams) { let SkillsListParams { cwds, force_reload } = params; let cwds = if cwds.is_empty() { diff --git a/codex-rs/app-server/tests/common/auth_fixtures.rs b/codex-rs/app-server/tests/common/auth_fixtures.rs index 071a920b8..9f1b62744 100644 --- a/codex-rs/app-server/tests/common/auth_fixtures.rs +++ b/codex-rs/app-server/tests/common/auth_fixtures.rs @@ -49,6 +49,16 @@ impl ChatGptAuthFixture { self } + pub fn chatgpt_user_id(mut self, chatgpt_user_id: impl Into) -> Self { + self.claims.chatgpt_user_id = Some(chatgpt_user_id.into()); + self + } + + pub fn chatgpt_account_id(mut self, chatgpt_account_id: impl Into) -> Self { + self.claims.chatgpt_account_id = Some(chatgpt_account_id.into()); + self + } + pub fn email(mut self, email: impl Into) -> Self { self.claims.email = Some(email.into()); self @@ -69,6 +79,8 @@ impl ChatGptAuthFixture { pub struct ChatGptIdTokenClaims { pub email: Option, pub plan_type: Option, + pub chatgpt_user_id: Option, + pub chatgpt_account_id: Option, } impl ChatGptIdTokenClaims { @@ -85,6 +97,16 @@ impl ChatGptIdTokenClaims { self.plan_type = Some(plan_type.into()); self } + + pub fn chatgpt_user_id(mut self, chatgpt_user_id: impl Into) -> Self { + self.chatgpt_user_id = Some(chatgpt_user_id.into()); + self + } + + pub fn chatgpt_account_id(mut self, chatgpt_account_id: impl Into) -> Self { + self.chatgpt_account_id = Some(chatgpt_account_id.into()); + self + } } pub fn encode_id_token(claims: &ChatGptIdTokenClaims) -> Result { @@ -93,10 +115,20 @@ pub fn encode_id_token(claims: &ChatGptIdTokenClaims) -> Result { if let Some(email) = &claims.email { payload.insert("email".to_string(), json!(email)); } + let mut auth_payload = serde_json::Map::new(); if let Some(plan_type) = &claims.plan_type { + auth_payload.insert("chatgpt_plan_type".to_string(), json!(plan_type)); + } + if let Some(chatgpt_user_id) = &claims.chatgpt_user_id { + auth_payload.insert("chatgpt_user_id".to_string(), json!(chatgpt_user_id)); + } + if let Some(chatgpt_account_id) = &claims.chatgpt_account_id { + auth_payload.insert("chatgpt_account_id".to_string(), json!(chatgpt_account_id)); + } + if !auth_payload.is_empty() { payload.insert( "https://api.openai.com/auth".to_string(), - json!({ "chatgpt_plan_type": plan_type }), + serde_json::Value::Object(auth_payload), ); } let payload = serde_json::Value::Object(payload); diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index 874305b59..0be10c5fc 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -12,6 +12,7 @@ use tokio::process::ChildStdout; use anyhow::Context; use codex_app_server_protocol::AddConversationListenerParams; +use codex_app_server_protocol::AppsListParams; use codex_app_server_protocol::ArchiveConversationParams; use codex_app_server_protocol::CancelLoginAccountParams; use codex_app_server_protocol::CancelLoginChatGptParams; @@ -409,6 +410,12 @@ impl McpProcess { self.send_request("model/list", params).await } + /// Send an `app/list` JSON-RPC request. + pub async fn send_apps_list_request(&mut self, params: AppsListParams) -> anyhow::Result { + let params = Some(serde_json::to_value(params)?); + self.send_request("app/list", params).await + } + /// Send a `collaborationMode/list` JSON-RPC request. pub async fn send_list_collaboration_modes_request( &mut self, diff --git a/codex-rs/app-server/tests/suite/v2/app_list.rs b/codex-rs/app-server/tests/suite/v2/app_list.rs new file mode 100644 index 000000000..4c3e22ffa --- /dev/null +++ b/codex-rs/app-server/tests/suite/v2/app_list.rs @@ -0,0 +1,381 @@ +use std::borrow::Cow; +use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use app_test_support::ChatGptAuthFixture; +use app_test_support::McpProcess; +use app_test_support::to_response; +use app_test_support::write_chatgpt_auth; +use axum::Json; +use axum::Router; +use axum::extract::State; +use axum::http::HeaderMap; +use axum::http::StatusCode; +use axum::http::header::AUTHORIZATION; +use axum::routing::post; +use codex_app_server_protocol::AppInfo; +use codex_app_server_protocol::AppsListParams; +use codex_app_server_protocol::AppsListResponse; +use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::RequestId; +use codex_core::auth::AuthCredentialsStoreMode; +use codex_core::connectors::ConnectorInfo; +use pretty_assertions::assert_eq; +use rmcp::handler::server::ServerHandler; +use rmcp::model::JsonObject; +use rmcp::model::ListToolsResult; +use rmcp::model::Meta; +use rmcp::model::ServerCapabilities; +use rmcp::model::ServerInfo; +use rmcp::model::Tool; +use rmcp::model::ToolAnnotations; +use rmcp::transport::StreamableHttpServerConfig; +use rmcp::transport::StreamableHttpService; +use rmcp::transport::streamable_http_server::session::local::LocalSessionManager; +use serde_json::json; +use tempfile::TempDir; +use tokio::net::TcpListener; +use tokio::task::JoinHandle; +use tokio::time::timeout; + +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); + +#[tokio::test] +async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> { + let codex_home = TempDir::new()?; + let mut mcp = McpProcess::new(codex_home.path()).await?; + + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_apps_list_request(AppsListParams { + limit: Some(50), + cursor: None, + }) + .await?; + + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + + let AppsListResponse { data, next_cursor } = to_response(response)?; + + assert!(data.is_empty()); + assert!(next_cursor.is_none()); + Ok(()) +} + +#[tokio::test] +async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { + let connectors = vec![ + ConnectorInfo { + connector_id: "alpha".to_string(), + connector_name: "Alpha".to_string(), + connector_description: Some("Alpha connector".to_string()), + logo_url: Some("https://example.com/alpha.png".to_string()), + install_url: None, + is_accessible: false, + }, + ConnectorInfo { + connector_id: "beta".to_string(), + connector_name: "beta".to_string(), + connector_description: None, + logo_url: None, + install_url: None, + is_accessible: false, + }, + ]; + + let tools = vec![connector_tool("beta", "Beta App")?]; + let (server_url, server_handle) = start_apps_server(connectors.clone(), tools).await?; + + let codex_home = TempDir::new()?; + write_connectors_config(codex_home.path(), &server_url)?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_apps_list_request(AppsListParams { + limit: None, + cursor: None, + }) + .await?; + + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + + let AppsListResponse { data, next_cursor } = to_response(response)?; + + let expected = vec![ + AppInfo { + id: "beta".to_string(), + name: "Beta App".to_string(), + description: None, + logo_url: None, + install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()), + is_accessible: true, + }, + AppInfo { + id: "alpha".to_string(), + name: "Alpha".to_string(), + description: Some("Alpha connector".to_string()), + logo_url: Some("https://example.com/alpha.png".to_string()), + install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()), + is_accessible: false, + }, + ]; + + assert_eq!(data, expected); + assert!(next_cursor.is_none()); + + server_handle.abort(); + Ok(()) +} + +#[tokio::test] +async fn list_apps_paginates_results() -> Result<()> { + let connectors = vec![ + ConnectorInfo { + connector_id: "alpha".to_string(), + connector_name: "Alpha".to_string(), + connector_description: Some("Alpha connector".to_string()), + logo_url: None, + install_url: None, + is_accessible: false, + }, + ConnectorInfo { + connector_id: "beta".to_string(), + connector_name: "beta".to_string(), + connector_description: None, + logo_url: None, + install_url: None, + is_accessible: false, + }, + ]; + + let tools = vec![connector_tool("beta", "Beta App")?]; + let (server_url, server_handle) = start_apps_server(connectors.clone(), tools).await?; + + let codex_home = TempDir::new()?; + write_connectors_config(codex_home.path(), &server_url)?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let first_request = mcp + .send_apps_list_request(AppsListParams { + limit: Some(1), + cursor: None, + }) + .await?; + let first_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(first_request)), + ) + .await??; + let AppsListResponse { + data: first_page, + next_cursor: first_cursor, + } = to_response(first_response)?; + + let expected_first = vec![AppInfo { + id: "beta".to_string(), + name: "Beta App".to_string(), + description: None, + logo_url: None, + install_url: Some("https://chatgpt.com/apps/beta/beta".to_string()), + is_accessible: true, + }]; + + assert_eq!(first_page, expected_first); + let next_cursor = first_cursor.ok_or_else(|| anyhow::anyhow!("missing cursor"))?; + + let second_request = mcp + .send_apps_list_request(AppsListParams { + limit: Some(1), + cursor: Some(next_cursor), + }) + .await?; + let second_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(second_request)), + ) + .await??; + let AppsListResponse { + data: second_page, + next_cursor: second_cursor, + } = to_response(second_response)?; + + let expected_second = vec![AppInfo { + id: "alpha".to_string(), + name: "Alpha".to_string(), + description: Some("Alpha connector".to_string()), + logo_url: None, + install_url: Some("https://chatgpt.com/apps/alpha/alpha".to_string()), + is_accessible: false, + }]; + + assert_eq!(second_page, expected_second); + assert!(second_cursor.is_none()); + + server_handle.abort(); + Ok(()) +} + +#[derive(Clone)] +struct AppsServerState { + expected_bearer: String, + expected_account_id: String, + response: serde_json::Value, +} + +#[derive(Clone)] +struct AppListMcpServer { + tools: Arc>, +} + +impl AppListMcpServer { + fn new(tools: Arc>) -> Self { + Self { tools } + } +} + +impl ServerHandler for AppListMcpServer { + fn get_info(&self) -> ServerInfo { + ServerInfo { + capabilities: ServerCapabilities::builder().enable_tools().build(), + ..ServerInfo::default() + } + } + + fn list_tools( + &self, + _request: Option, + _context: rmcp::service::RequestContext, + ) -> impl std::future::Future> + Send + '_ + { + let tools = self.tools.clone(); + async move { + Ok(ListToolsResult { + tools: (*tools).clone(), + next_cursor: None, + meta: None, + }) + } + } +} + +async fn start_apps_server( + connectors: Vec, + tools: Vec, +) -> Result<(String, JoinHandle<()>)> { + let state = AppsServerState { + expected_bearer: "Bearer chatgpt-token".to_string(), + expected_account_id: "account-123".to_string(), + response: json!({ "connectors": connectors }), + }; + let state = Arc::new(state); + let tools = Arc::new(tools); + + let listener = TcpListener::bind("127.0.0.1:0").await?; + let addr = listener.local_addr()?; + + let mcp_service = StreamableHttpService::new( + { + let tools = tools.clone(); + move || Ok(AppListMcpServer::new(tools.clone())) + }, + Arc::new(LocalSessionManager::default()), + StreamableHttpServerConfig::default(), + ); + + let router = Router::new() + .route("/aip/connectors/list_accessible", post(list_connectors)) + .with_state(state) + .nest_service("/api/codex/apps", mcp_service); + + let handle = tokio::spawn(async move { + let _ = axum::serve(listener, router).await; + }); + + Ok((format!("http://{addr}"), handle)) +} + +async fn list_connectors( + State(state): State>, + headers: HeaderMap, +) -> Result { + let bearer_ok = headers + .get(AUTHORIZATION) + .and_then(|value| value.to_str().ok()) + .is_some_and(|value| value == state.expected_bearer); + let account_ok = headers + .get("chatgpt-account-id") + .and_then(|value| value.to_str().ok()) + .is_some_and(|value| value == state.expected_account_id); + + if bearer_ok && account_ok { + Ok(Json(state.response.clone())) + } else { + Err(StatusCode::UNAUTHORIZED) + } +} + +fn connector_tool(connector_id: &str, connector_name: &str) -> Result { + let schema: JsonObject = serde_json::from_value(json!({ + "type": "object", + "additionalProperties": false + }))?; + let mut tool = Tool::new( + Cow::Owned(format!("connector_{connector_id}")), + Cow::Borrowed("Connector test tool"), + Arc::new(schema), + ); + tool.annotations = Some(ToolAnnotations::new().read_only(true)); + + let mut meta = Meta::new(); + meta.0 + .insert("connector_id".to_string(), json!(connector_id)); + meta.0 + .insert("connector_name".to_string(), json!(connector_name)); + tool.meta = Some(meta); + Ok(tool) +} + +fn write_connectors_config(codex_home: &std::path::Path, base_url: &str) -> std::io::Result<()> { + let config_toml = codex_home.join("config.toml"); + std::fs::write( + config_toml, + format!( + r#" +chatgpt_base_url = "{base_url}" + +[features] +connectors = true +"# + ), + ) +} diff --git a/codex-rs/app-server/tests/suite/v2/mod.rs b/codex-rs/app-server/tests/suite/v2/mod.rs index fbae748b2..95ec61f20 100644 --- a/codex-rs/app-server/tests/suite/v2/mod.rs +++ b/codex-rs/app-server/tests/suite/v2/mod.rs @@ -1,5 +1,6 @@ mod account; mod analytics; +mod app_list; mod collaboration_mode_list; mod config_rpc; mod initialize; diff --git a/codex-rs/chatgpt/src/chatgpt_client.rs b/codex-rs/chatgpt/src/chatgpt_client.rs index 752863198..b35238cde 100644 --- a/codex-rs/chatgpt/src/chatgpt_client.rs +++ b/codex-rs/chatgpt/src/chatgpt_client.rs @@ -5,6 +5,7 @@ use crate::chatgpt_token::get_chatgpt_token_data; use crate::chatgpt_token::init_chatgpt_token_from_auth; use anyhow::Context; +use serde::Serialize; use serde::de::DeserializeOwned; /// Make a GET request to the ChatGPT backend API. @@ -48,3 +49,37 @@ pub(crate) async fn chatgpt_get_request( anyhow::bail!("Request failed with status {status}: {body}") } } + +pub(crate) async fn chatgpt_post_request( + config: &Config, + access_token: &str, + account_id: &str, + path: &str, + payload: &P, +) -> anyhow::Result { + let chatgpt_base_url = &config.chatgpt_base_url; + let client = create_client(); + let url = format!("{chatgpt_base_url}{path}"); + + let response = client + .post(&url) + .bearer_auth(access_token) + .header("chatgpt-account-id", account_id) + .header("Content-Type", "application/json") + .json(payload) + .send() + .await + .context("Failed to send request")?; + + if response.status().is_success() { + let result: T = response + .json() + .await + .context("Failed to parse JSON response")?; + Ok(result) + } else { + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + anyhow::bail!("Request failed with status {status}: {body}") + } +} diff --git a/codex-rs/chatgpt/src/connectors.rs b/codex-rs/chatgpt/src/connectors.rs new file mode 100644 index 000000000..5d913ae16 --- /dev/null +++ b/codex-rs/chatgpt/src/connectors.rs @@ -0,0 +1,125 @@ +use codex_core::config::Config; +use codex_core::features::Feature; +use serde::Deserialize; +use serde::Serialize; + +use crate::chatgpt_client::chatgpt_post_request; +use crate::chatgpt_token::get_chatgpt_token_data; +use crate::chatgpt_token::init_chatgpt_token_from_auth; + +pub use codex_core::connectors::ConnectorInfo; +pub use codex_core::connectors::connector_display_label; +use codex_core::connectors::connector_install_url; +pub use codex_core::connectors::list_accessible_connectors_from_mcp_tools; +use codex_core::connectors::merge_connectors; + +#[derive(Debug, Serialize)] +struct ListConnectorsRequest { + principals: Vec, +} + +#[derive(Debug, Serialize)] +struct Principal { + #[serde(rename = "type")] + principal_type: PrincipalType, + id: String, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +enum PrincipalType { + User, +} + +#[derive(Debug, Deserialize)] +struct ListConnectorsResponse { + connectors: Vec, +} + +pub async fn list_connectors(config: &Config) -> anyhow::Result> { + if !config.features.enabled(Feature::Connectors) { + return Ok(Vec::new()); + } + let (connectors_result, accessible_result) = tokio::join!( + list_all_connectors(config), + list_accessible_connectors_from_mcp_tools(config), + ); + let connectors = connectors_result?; + let accessible = accessible_result?; + Ok(merge_connectors(connectors, accessible)) +} + +pub async fn list_all_connectors(config: &Config) -> anyhow::Result> { + if !config.features.enabled(Feature::Connectors) { + return Ok(Vec::new()); + } + init_chatgpt_token_from_auth(&config.codex_home, config.cli_auth_credentials_store_mode) + .await?; + + let token_data = + get_chatgpt_token_data().ok_or_else(|| anyhow::anyhow!("ChatGPT token not available"))?; + let user_id = token_data + .id_token + .chatgpt_user_id + .as_deref() + .ok_or_else(|| { + anyhow::anyhow!("ChatGPT user ID not available, please re-run `codex login`") + })?; + let account_id = token_data + .id_token + .chatgpt_account_id + .as_deref() + .ok_or_else(|| { + anyhow::anyhow!("ChatGPT account ID not available, please re-run `codex login`") + })?; + let principal_id = format!("{user_id}__{account_id}"); + let request = ListConnectorsRequest { + principals: vec![Principal { + principal_type: PrincipalType::User, + id: principal_id, + }], + }; + let response: ListConnectorsResponse = chatgpt_post_request( + config, + token_data.access_token.as_str(), + account_id, + "/aip/connectors/list_accessible?skip_actions=true&external_logos=true", + &request, + ) + .await?; + let mut connectors = response.connectors; + for connector in &mut connectors { + let install_url = match connector.install_url.take() { + Some(install_url) => install_url, + None => connector_install_url(&connector.connector_name, &connector.connector_id), + }; + connector.connector_name = + normalize_connector_name(&connector.connector_name, &connector.connector_id); + connector.connector_description = + normalize_connector_value(connector.connector_description.as_deref()); + connector.install_url = Some(install_url); + connector.is_accessible = false; + } + connectors.sort_by(|left, right| { + left.connector_name + .cmp(&right.connector_name) + .then_with(|| left.connector_id.cmp(&right.connector_id)) + }); + Ok(connectors) +} + +fn normalize_connector_name(name: &str, connector_id: &str) -> String { + let trimmed = name.trim(); + if trimmed.is_empty() { + connector_id.to_string() + } else { + trimmed.to_string() + } +} + +fn normalize_connector_value(value: Option<&str>) -> Option { + value + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(str::to_string) +} diff --git a/codex-rs/chatgpt/src/lib.rs b/codex-rs/chatgpt/src/lib.rs index 440a309db..0d39bb932 100644 --- a/codex-rs/chatgpt/src/lib.rs +++ b/codex-rs/chatgpt/src/lib.rs @@ -1,4 +1,5 @@ pub mod apply_command; mod chatgpt_client; mod chatgpt_token; +pub mod connectors; pub mod get_task; diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index e2d7b80b3..13cdbc674 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -153,6 +153,9 @@ "collaboration_modes": { "type": "boolean" }, + "connectors": { + "type": "boolean" + }, "elevated_windows_sandbox": { "type": "boolean" }, @@ -1135,6 +1138,9 @@ "collaboration_modes": { "type": "boolean" }, + "connectors": { + "type": "boolean" + }, "elevated_windows_sandbox": { "type": "boolean" }, diff --git a/codex-rs/core/src/auth.rs b/codex-rs/core/src/auth.rs index 523c77388..b74630927 100644 --- a/codex-rs/core/src/auth.rs +++ b/codex-rs/core/src/auth.rs @@ -996,6 +996,7 @@ mod tests { id_token: IdTokenInfo { email: Some("user@example.com".to_string()), chatgpt_plan_type: Some(InternalPlanType::Known(InternalKnownPlan::Pro)), + chatgpt_user_id: Some("user-12345".to_string()), chatgpt_account_id: None, raw_jwt: fake_jwt, }, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 1edf67ae6..ea52aad32 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -17,6 +17,7 @@ use crate::compact; use crate::compact::run_inline_auto_compact_task; use crate::compact::should_use_remote_compact_task; use crate::compact_remote::run_inline_remote_auto_compact_task; +use crate::connectors; use crate::exec_policy::ExecPolicyManager; use crate::features::Feature; use crate::features::Features; @@ -103,7 +104,10 @@ use crate::exec::StreamOutput; use crate::exec_policy::ExecPolicyUpdateError; use crate::feedback_tags; use crate::instructions::UserInstructions; +use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; use crate::mcp::auth::compute_auth_statuses; +use crate::mcp::effective_mcp_servers; +use crate::mcp::with_codex_apps_mcp; use crate::mcp_connection_manager::McpConnectionManager; use crate::model_provider_info::CHAT_WIRE_API_DEPRECATION_SUMMARY; use crate::project_doc::get_user_instructions; @@ -647,14 +651,25 @@ impl Session { let rollout_fut = RolloutRecorder::new(&config, rollout_params); let history_meta_fut = crate::message_history::history_metadata(&config); - let auth_statuses_fut = compute_auth_statuses( - config.mcp_servers.iter(), - config.mcp_oauth_credentials_store_mode, - ); + let auth_manager_clone = Arc::clone(&auth_manager); + let config_for_mcp = Arc::clone(&config); + let auth_and_mcp_fut = async move { + let auth = auth_manager_clone.auth().await; + let mcp_servers = effective_mcp_servers(&config_for_mcp, auth.as_ref()); + let auth_statuses = compute_auth_statuses( + mcp_servers.iter(), + config_for_mcp.mcp_oauth_credentials_store_mode, + ) + .await; + (auth, mcp_servers, auth_statuses) + }; // Join all independent futures. - let (rollout_recorder, (history_log_id, history_entry_count), auth_statuses) = - tokio::join!(rollout_fut, history_meta_fut, auth_statuses_fut); + let ( + rollout_recorder, + (history_log_id, history_entry_count), + (auth, mcp_servers, auth_statuses), + ) = tokio::join!(rollout_fut, history_meta_fut, auth_and_mcp_fut); let rollout_recorder = rollout_recorder.map_err(|e| { error!("failed to initialize rollout recorder: {e:#}"); @@ -694,7 +709,6 @@ impl Session { } maybe_push_chat_wire_api_deprecation(&config, &mut post_session_configured_events); - let auth = auth_manager.auth().await; let auth = auth.as_ref(); let otel_manager = OtelManager::new( conversation_id, @@ -729,7 +743,7 @@ impl Session { config.model_auto_compact_token_limit, config.approval_policy.value(), config.sandbox_policy.get().clone(), - config.mcp_servers.keys().map(String::as_str).collect(), + mcp_servers.keys().map(String::as_str).collect(), config.active_profile.clone(), ); @@ -813,7 +827,7 @@ impl Session { .write() .await .initialize( - &config.mcp_servers, + &mcp_servers, config.mcp_oauth_credentials_store_mode, auth_statuses.clone(), tx_event.clone(), @@ -1987,6 +2001,14 @@ impl Session { } }; + let auth = self.services.auth_manager.auth().await; + let config = self.get_config().await; + let mcp_servers = with_codex_apps_mcp( + mcp_servers, + self.features.enabled(Feature::Connectors), + auth.as_ref(), + config.as_ref(), + ); let auth_statuses = compute_auth_statuses(mcp_servers.iter(), store_mode).await; let sandbox_state = SandboxState { sandbox_policy: turn_context.sandbox_policy.clone(), @@ -2168,6 +2190,7 @@ mod handlers { use crate::mcp::auth::compute_auth_statuses; use crate::mcp::collect_mcp_snapshot_from_manager; + use crate::mcp::effective_mcp_servers; use crate::review_prompts::resolve_review_request; use crate::tasks::CompactTask; use crate::tasks::RegularTask; @@ -2481,13 +2504,12 @@ mod handlers { pub async fn list_mcp_tools(sess: &Session, config: &Arc, sub_id: String) { let mcp_connection_manager = sess.services.mcp_connection_manager.read().await; + let auth = sess.services.auth_manager.auth().await; + let mcp_servers = effective_mcp_servers(config, auth.as_ref()); let snapshot = collect_mcp_snapshot_from_manager( &mcp_connection_manager, - compute_auth_statuses( - config.mcp_servers.iter(), - config.mcp_oauth_credentials_store_mode, - ) - .await, + compute_auth_statuses(mcp_servers.iter(), config.mcp_oauth_credentials_store_mode) + .await, ) .await; let event = Event { @@ -3001,6 +3023,60 @@ async fn run_auto_compact(sess: &Arc, turn_context: &Arc) } } +fn filter_connectors_for_input( + connectors: Vec, + input: &[ResponseItem], +) -> Vec { + let user_messages = collect_user_messages(input); + if user_messages.is_empty() { + return Vec::new(); + } + + connectors + .into_iter() + .filter(|connector| connector_inserted_in_messages(connector, &user_messages)) + .collect() +} + +fn connector_inserted_in_messages( + connector: &connectors::ConnectorInfo, + user_messages: &[String], +) -> bool { + let label = connectors::connector_display_label(connector); + let needle = label.to_lowercase(); + let legacy = format!("{label} connector").to_lowercase(); + user_messages.iter().any(|message| { + let message = message.to_lowercase(); + message.contains(&needle) || message.contains(&legacy) + }) +} + +fn filter_codex_apps_mcp_tools( + mut mcp_tools: HashMap, + connectors: &[connectors::ConnectorInfo], +) -> HashMap { + let allowed: HashSet<&str> = connectors + .iter() + .map(|connector| connector.connector_id.as_str()) + .collect(); + + mcp_tools.retain(|_, tool| { + if tool.server_name != CODEX_APPS_MCP_SERVER_NAME { + return true; + } + let Some(connector_id) = codex_apps_connector_id(tool) else { + return false; + }; + allowed.contains(connector_id) + }); + + mcp_tools +} + +fn codex_apps_connector_id(tool: &crate::mcp_connection_manager::ToolInfo) -> Option<&str> { + tool.connector_id.as_deref() +} + #[instrument(level = "trace", skip_all, fields( @@ -3017,7 +3093,7 @@ async fn run_sampling_request( input: Vec, cancellation_token: CancellationToken, ) -> CodexResult { - let mcp_tools = sess + let mut mcp_tools = sess .services .mcp_connection_manager .read() @@ -3025,6 +3101,20 @@ async fn run_sampling_request( .list_all_tools() .or_cancel(&cancellation_token) .await?; + let connectors_for_tools = if turn_context + .client + .config() + .features + .enabled(Feature::Connectors) + { + let connectors = connectors::accessible_connectors_from_mcp_tools(&mcp_tools); + Some(filter_connectors_for_input(connectors, &input)) + } else { + None + }; + if let Some(connectors) = connectors_for_tools.as_ref() { + mcp_tools = filter_codex_apps_mcp_tools(mcp_tools, connectors); + } let router = Arc::new(ToolRouter::from_config( &turn_context.tools_config, Some( diff --git a/codex-rs/core/src/connectors.rs b/codex-rs/core/src/connectors.rs new file mode 100644 index 000000000..fa97dacc3 --- /dev/null +++ b/codex-rs/core/src/connectors.rs @@ -0,0 +1,227 @@ +use std::collections::HashMap; +use std::env; +use std::path::PathBuf; + +use async_channel::unbounded; +use codex_protocol::protocol::SandboxPolicy; +use serde::Deserialize; +use serde::Serialize; +use tokio_util::sync::CancellationToken; + +use crate::AuthManager; +use crate::SandboxState; +use crate::config::Config; +use crate::features::Feature; +use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; +use crate::mcp::auth::compute_auth_statuses; +use crate::mcp::with_codex_apps_mcp; +use crate::mcp_connection_manager::McpConnectionManager; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ConnectorInfo { + #[serde(rename = "id")] + pub connector_id: String, + #[serde(rename = "name")] + pub connector_name: String, + #[serde(default, rename = "description")] + pub connector_description: Option, + #[serde(default, rename = "logo_url")] + pub logo_url: Option, + #[serde(default, rename = "install_url")] + pub install_url: Option, + #[serde(default)] + pub is_accessible: bool, +} + +pub async fn list_accessible_connectors_from_mcp_tools( + config: &Config, +) -> anyhow::Result> { + if !config.features.enabled(Feature::Connectors) { + return Ok(Vec::new()); + } + + let auth_manager = auth_manager_from_config(config); + let auth = auth_manager.auth().await; + let mcp_servers = with_codex_apps_mcp(HashMap::new(), true, auth.as_ref(), config); + if mcp_servers.is_empty() { + return Ok(Vec::new()); + } + + let auth_status_entries = + compute_auth_statuses(mcp_servers.iter(), config.mcp_oauth_credentials_store_mode).await; + + let mut mcp_connection_manager = McpConnectionManager::default(); + let (tx_event, rx_event) = unbounded(); + drop(rx_event); + let cancel_token = CancellationToken::new(); + + let sandbox_state = SandboxState { + sandbox_policy: SandboxPolicy::ReadOnly, + codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), + sandbox_cwd: env::current_dir().unwrap_or_else(|_| PathBuf::from("/")), + }; + + mcp_connection_manager + .initialize( + &mcp_servers, + config.mcp_oauth_credentials_store_mode, + auth_status_entries, + tx_event, + cancel_token.clone(), + sandbox_state, + ) + .await; + + let tools = mcp_connection_manager.list_all_tools().await; + cancel_token.cancel(); + + Ok(accessible_connectors_from_mcp_tools(&tools)) +} + +fn auth_manager_from_config(config: &Config) -> std::sync::Arc { + AuthManager::shared( + config.codex_home.clone(), + false, + config.cli_auth_credentials_store_mode, + ) +} + +pub fn connector_display_label(connector: &ConnectorInfo) -> String { + format_connector_label(&connector.connector_name, &connector.connector_id) +} + +pub(crate) fn accessible_connectors_from_mcp_tools( + mcp_tools: &HashMap, +) -> Vec { + let tools = mcp_tools.values().filter_map(|tool| { + if tool.server_name != CODEX_APPS_MCP_SERVER_NAME { + return None; + } + let connector_id = tool.connector_id.as_deref()?; + let connector_name = normalize_connector_value(tool.connector_name.as_deref()); + Some((connector_id.to_string(), connector_name)) + }); + collect_accessible_connectors(tools) +} + +pub fn merge_connectors( + connectors: Vec, + accessible_connectors: Vec, +) -> Vec { + let mut merged: HashMap = connectors + .into_iter() + .map(|mut connector| { + connector.is_accessible = false; + (connector.connector_id.clone(), connector) + }) + .collect(); + + for mut connector in accessible_connectors { + connector.is_accessible = true; + let connector_id = connector.connector_id.clone(); + if let Some(existing) = merged.get_mut(&connector_id) { + existing.is_accessible = true; + if existing.connector_name == existing.connector_id + && connector.connector_name != connector.connector_id + { + existing.connector_name = connector.connector_name; + } + if existing.connector_description.is_none() && connector.connector_description.is_some() + { + existing.connector_description = connector.connector_description; + } + if existing.logo_url.is_none() && connector.logo_url.is_some() { + existing.logo_url = connector.logo_url; + } + } else { + merged.insert(connector_id, connector); + } + } + + let mut merged = merged.into_values().collect::>(); + for connector in &mut merged { + if connector.install_url.is_none() { + connector.install_url = Some(connector_install_url( + &connector.connector_name, + &connector.connector_id, + )); + } + } + merged.sort_by(|left, right| { + right + .is_accessible + .cmp(&left.is_accessible) + .then_with(|| left.connector_name.cmp(&right.connector_name)) + .then_with(|| left.connector_id.cmp(&right.connector_id)) + }); + merged +} + +fn collect_accessible_connectors(tools: I) -> Vec +where + I: IntoIterator)>, +{ + let mut connectors: HashMap = HashMap::new(); + for (connector_id, connector_name) in tools { + let connector_name = connector_name.unwrap_or_else(|| connector_id.clone()); + if let Some(existing_name) = connectors.get_mut(&connector_id) { + if existing_name == &connector_id && connector_name != connector_id { + *existing_name = connector_name; + } + } else { + connectors.insert(connector_id, connector_name); + } + } + let mut accessible: Vec = connectors + .into_iter() + .map(|(connector_id, connector_name)| ConnectorInfo { + install_url: Some(connector_install_url(&connector_name, &connector_id)), + connector_id, + connector_name, + connector_description: None, + logo_url: None, + is_accessible: true, + }) + .collect(); + accessible.sort_by(|left, right| { + right + .is_accessible + .cmp(&left.is_accessible) + .then_with(|| left.connector_name.cmp(&right.connector_name)) + .then_with(|| left.connector_id.cmp(&right.connector_id)) + }); + accessible +} + +fn normalize_connector_value(value: Option<&str>) -> Option { + value + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(str::to_string) +} + +pub fn connector_install_url(name: &str, connector_id: &str) -> String { + let slug = connector_name_slug(name); + format!("https://chatgpt.com/apps/{slug}/{connector_id}") +} + +fn connector_name_slug(name: &str) -> String { + let mut normalized = String::with_capacity(name.len()); + for character in name.chars() { + if character.is_ascii_alphanumeric() { + normalized.push(character.to_ascii_lowercase()); + } else { + normalized.push('-'); + } + } + let normalized = normalized.trim_matches('-'); + if normalized.is_empty() { + "app".to_string() + } else { + normalized.to_string() + } +} + +fn format_connector_label(name: &str, _id: &str) -> String { + name.to_string() +} diff --git a/codex-rs/core/src/features.rs b/codex-rs/core/src/features.rs index 407e40e3d..a408f8ffc 100644 --- a/codex-rs/core/src/features.rs +++ b/codex-rs/core/src/features.rs @@ -101,6 +101,8 @@ pub enum Feature { EnableRequestCompression, /// Enable collab tools. Collab, + /// Enable connectors (apps). + Connectors, /// Steer feature flag - when enabled, Enter submits immediately instead of queuing. Steer, /// Enable collaboration modes (Plan, Pair Programming, Execute). @@ -439,6 +441,12 @@ pub const FEATURES: &[FeatureSpec] = &[ }, default_enabled: false, }, + FeatureSpec { + id: Feature::Connectors, + key: "connectors", + stage: Stage::Beta, + default_enabled: false, + }, FeatureSpec { id: Feature::Steer, key: "steer", diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 1fc61a292..41d33a4ce 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -20,6 +20,7 @@ mod codex_delegate; mod command_safety; pub mod config; pub mod config_loader; +pub mod connectors; mod context_manager; pub mod custom_prompts; pub mod env; diff --git a/codex-rs/core/src/mcp/mod.rs b/codex-rs/core/src/mcp/mod.rs index 9e5446a74..c3513bb06 100644 --- a/codex-rs/core/src/mcp/mod.rs +++ b/codex-rs/core/src/mcp/mod.rs @@ -9,16 +9,135 @@ use codex_protocol::protocol::SandboxPolicy; use mcp_types::Tool as McpTool; use tokio_util::sync::CancellationToken; +use crate::AuthManager; +use crate::CodexAuth; use crate::config::Config; +use crate::config::types::McpServerConfig; +use crate::config::types::McpServerTransportConfig; +use crate::features::Feature; use crate::mcp::auth::compute_auth_statuses; use crate::mcp_connection_manager::McpConnectionManager; use crate::mcp_connection_manager::SandboxState; const MCP_TOOL_NAME_PREFIX: &str = "mcp"; const MCP_TOOL_NAME_DELIMITER: &str = "__"; +pub(crate) const CODEX_APPS_MCP_SERVER_NAME: &str = "codex_apps_mcp"; +const CODEX_CONNECTORS_TOKEN_ENV_VAR: &str = "CODEX_CONNECTORS_TOKEN"; + +fn codex_apps_mcp_bearer_token_env_var() -> Option { + match env::var(CODEX_CONNECTORS_TOKEN_ENV_VAR) { + Ok(value) if !value.trim().is_empty() => Some(CODEX_CONNECTORS_TOKEN_ENV_VAR.to_string()), + Ok(_) => None, + Err(env::VarError::NotPresent) => None, + Err(env::VarError::NotUnicode(_)) => Some(CODEX_CONNECTORS_TOKEN_ENV_VAR.to_string()), + } +} + +fn codex_apps_mcp_bearer_token(auth: Option<&CodexAuth>) -> Option { + let token = auth.and_then(|auth| auth.get_token().ok())?; + let token = token.trim(); + if token.is_empty() { + None + } else { + Some(token.to_string()) + } +} + +fn codex_apps_mcp_http_headers(auth: Option<&CodexAuth>) -> Option> { + let mut headers = HashMap::new(); + if let Some(token) = codex_apps_mcp_bearer_token(auth) { + headers.insert("Authorization".to_string(), format!("Bearer {token}")); + } + if let Some(account_id) = auth.and_then(CodexAuth::get_account_id) { + headers.insert("ChatGPT-Account-ID".to_string(), account_id); + } + if headers.is_empty() { + None + } else { + Some(headers) + } +} + +fn codex_apps_mcp_url(base_url: &str) -> String { + let mut base_url = base_url.trim_end_matches('/').to_string(); + if (base_url.starts_with("https://chatgpt.com") + || base_url.starts_with("https://chat.openai.com")) + && !base_url.contains("/backend-api") + { + base_url = format!("{base_url}/backend-api"); + } + if base_url.contains("/backend-api") { + format!("{base_url}/wham/apps") + } else if base_url.contains("/api/codex") { + format!("{base_url}/apps") + } else { + format!("{base_url}/api/codex/apps") + } +} + +fn codex_apps_mcp_server_config(config: &Config, auth: Option<&CodexAuth>) -> McpServerConfig { + let bearer_token_env_var = codex_apps_mcp_bearer_token_env_var(); + let http_headers = if bearer_token_env_var.is_some() { + None + } else { + codex_apps_mcp_http_headers(auth) + }; + let url = codex_apps_mcp_url(&config.chatgpt_base_url); + + McpServerConfig { + transport: McpServerTransportConfig::StreamableHttp { + url, + bearer_token_env_var, + http_headers, + env_http_headers: None, + }, + enabled: true, + disabled_reason: None, + startup_timeout_sec: None, + tool_timeout_sec: None, + enabled_tools: None, + disabled_tools: None, + } +} + +pub(crate) fn with_codex_apps_mcp( + mut servers: HashMap, + connectors_enabled: bool, + auth: Option<&CodexAuth>, + config: &Config, +) -> HashMap { + if connectors_enabled { + servers.insert( + CODEX_APPS_MCP_SERVER_NAME.to_string(), + codex_apps_mcp_server_config(config, auth), + ); + } else { + servers.remove(CODEX_APPS_MCP_SERVER_NAME); + } + servers +} + +pub(crate) fn effective_mcp_servers( + config: &Config, + auth: Option<&CodexAuth>, +) -> HashMap { + with_codex_apps_mcp( + config.mcp_servers.get().clone(), + config.features.enabled(Feature::Connectors), + auth, + config, + ) +} pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent { - if config.mcp_servers.is_empty() { + let auth_manager = AuthManager::shared( + config.codex_home.clone(), + false, + config.cli_auth_credentials_store_mode, + ); + let auth = auth_manager.auth().await; + let mcp_servers = effective_mcp_servers(config, auth.as_ref()); + if mcp_servers.is_empty() { return McpListToolsResponseEvent { tools: HashMap::new(), resources: HashMap::new(), @@ -27,11 +146,8 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent }; } - let auth_status_entries = compute_auth_statuses( - config.mcp_servers.iter(), - config.mcp_oauth_credentials_store_mode, - ) - .await; + let auth_status_entries = + compute_auth_statuses(mcp_servers.iter(), config.mcp_oauth_credentials_store_mode).await; let mut mcp_connection_manager = McpConnectionManager::default(); let (tx_event, rx_event) = unbounded(); @@ -47,7 +163,7 @@ pub async fn collect_mcp_snapshot(config: &Config) -> McpListToolsResponseEvent mcp_connection_manager .initialize( - &config.mcp_servers, + &mcp_servers, config.mcp_oauth_credentials_store_mode, auth_status_entries.clone(), tx_event, diff --git a/codex-rs/core/src/mcp_connection_manager.rs b/codex-rs/core/src/mcp_connection_manager.rs index 0d638760f..434db3b2f 100644 --- a/codex-rs/core/src/mcp_connection_manager.rs +++ b/codex-rs/core/src/mcp_connection_manager.rs @@ -153,6 +153,8 @@ pub(crate) struct ToolInfo { pub(crate) server_name: String, pub(crate) tool_name: String, pub(crate) tool: Tool, + pub(crate) connector_id: Option, + pub(crate) connector_name: Option, } type ResponderMap = HashMap<(String, RequestId), oneshot::Sender>; @@ -899,14 +901,16 @@ async fn list_tools_for_client( client: &Arc, timeout: Option, ) -> Result> { - let resp = client.list_tools(None, timeout).await?; + let resp = client.list_tools_with_connector_ids(None, timeout).await?; Ok(resp .tools .into_iter() .map(|tool| ToolInfo { server_name: server_name.to_owned(), - tool_name: tool.name.clone(), - tool, + tool_name: tool.tool.name.clone(), + tool: tool.tool, + connector_id: tool.connector_id, + connector_name: tool.connector_name, }) .collect()) } @@ -1004,6 +1008,8 @@ mod tests { output_schema: None, title: None, }, + connector_id: None, + connector_name: None, } } diff --git a/codex-rs/core/src/token_data.rs b/codex-rs/core/src/token_data.rs index 0a7694e9f..526e240a6 100644 --- a/codex-rs/core/src/token_data.rs +++ b/codex-rs/core/src/token_data.rs @@ -28,6 +28,8 @@ pub struct IdTokenInfo { /// (e.g., "free", "plus", "pro", "business", "enterprise", "edu"). /// (Note: values may vary by backend.) pub(crate) chatgpt_plan_type: Option, + /// ChatGPT user identifier associated with the token, if present. + pub chatgpt_user_id: Option, /// Organization/workspace identifier associated with the token, if present. pub chatgpt_account_id: Option, pub raw_jwt: String, @@ -74,6 +76,10 @@ struct AuthClaims { #[serde(default)] chatgpt_plan_type: Option, #[serde(default)] + chatgpt_user_id: Option, + #[serde(default)] + user_id: Option, + #[serde(default)] chatgpt_account_id: Option, } @@ -103,12 +109,14 @@ pub fn parse_id_token(id_token: &str) -> Result { email: claims.email, raw_jwt: id_token.to_string(), chatgpt_plan_type: auth.chatgpt_plan_type, + chatgpt_user_id: auth.chatgpt_user_id.or(auth.user_id), chatgpt_account_id: auth.chatgpt_account_id, }), None => Ok(IdTokenInfo { email: claims.email, raw_jwt: id_token.to_string(), chatgpt_plan_type: None, + chatgpt_user_id: None, chatgpt_account_id: None, }), } diff --git a/codex-rs/rmcp-client/src/lib.rs b/codex-rs/rmcp-client/src/lib.rs index 954898cea..e4d1f3b9f 100644 --- a/codex-rs/rmcp-client/src/lib.rs +++ b/codex-rs/rmcp-client/src/lib.rs @@ -22,5 +22,7 @@ pub use perform_oauth_login::perform_oauth_login_return_url; pub use rmcp::model::ElicitationAction; pub use rmcp_client::Elicitation; pub use rmcp_client::ElicitationResponse; +pub use rmcp_client::ListToolsWithConnectorIdResult; pub use rmcp_client::RmcpClient; pub use rmcp_client::SendElicitation; +pub use rmcp_client::ToolWithConnectorId; diff --git a/codex-rs/rmcp-client/src/rmcp_client.rs b/codex-rs/rmcp-client/src/rmcp_client.rs index b977389ea..c1bf6d39d 100644 --- a/codex-rs/rmcp-client/src/rmcp_client.rs +++ b/codex-rs/rmcp-client/src/rmcp_client.rs @@ -23,6 +23,7 @@ use mcp_types::ListToolsResult; use mcp_types::ReadResourceRequestParams; use mcp_types::ReadResourceResult; use mcp_types::RequestId; +use mcp_types::Tool; use reqwest::header::HeaderMap; use rmcp::model::CallToolRequestParam; use rmcp::model::ClientNotification; @@ -44,6 +45,7 @@ use rmcp::transport::auth::AuthClient; use rmcp::transport::auth::OAuthState; use rmcp::transport::child_process::TokioChildProcess; use rmcp::transport::streamable_http_client::StreamableHttpClientTransportConfig; +use serde_json::Value; use tokio::io::AsyncBufReadExt; use tokio::io::BufReader; use tokio::process::Command; @@ -95,6 +97,17 @@ pub type SendElicitation = Box< dyn Fn(RequestId, Elicitation) -> BoxFuture<'static, Result> + Send + Sync, >; +pub struct ToolWithConnectorId { + pub tool: Tool, + pub connector_id: Option, + pub connector_name: Option, +} + +pub struct ListToolsWithConnectorIdResult { + pub next_cursor: Option, + pub tools: Vec, +} + /// MCP client implemented on top of the official `rmcp` SDK. /// https://github.com/modelcontextprotocol/rust-sdk pub struct RmcpClient { @@ -286,6 +299,18 @@ impl RmcpClient { params: Option, timeout: Option, ) -> Result { + let result = self.list_tools_with_connector_ids(params, timeout).await?; + Ok(ListToolsResult { + next_cursor: result.next_cursor, + tools: result.tools.into_iter().map(|tool| tool.tool).collect(), + }) + } + + pub async fn list_tools_with_connector_ids( + &self, + params: Option, + timeout: Option, + ) -> Result { self.refresh_oauth_if_needed().await; let service = self.service().await?; let rmcp_params = params @@ -294,9 +319,35 @@ impl RmcpClient { let fut = service.list_tools(rmcp_params); let result = run_with_timeout(fut, timeout, "tools/list").await?; - let converted = convert_to_mcp(result)?; + let tools = result + .tools + .into_iter() + .map(|tool| { + let meta = tool.meta.as_ref(); + let connector_id = Self::meta_string(meta, "connector_id"); + let connector_name = Self::meta_string(meta, "connector_name") + .or_else(|| Self::meta_string(meta, "connector_display_name")); + let tool = convert_to_mcp(tool)?; + Ok(ToolWithConnectorId { + tool, + connector_id, + connector_name, + }) + }) + .collect::>>()?; self.persist_oauth_tokens().await; - Ok(converted) + Ok(ListToolsWithConnectorIdResult { + next_cursor: result.next_cursor, + tools, + }) + } + + fn meta_string(meta: Option<&rmcp::model::Meta>, key: &str) -> Option { + meta.and_then(|meta| meta.get(key)) + .and_then(Value::as_str) + .map(str::trim) + .filter(|value| !value.is_empty()) + .map(str::to_string) } pub async fn list_resources( diff --git a/docs/config.md b/docs/config.md index 87945b25a..9d53538ca 100644 --- a/docs/config.md +++ b/docs/config.md @@ -12,6 +12,12 @@ Codex can connect to MCP servers configured in `~/.codex/config.toml`. See the c - https://developers.openai.com/codex/config-reference +## Apps (Connectors) + +Use `$` in the composer to insert a ChatGPT connector; the popover lists accessible +apps. The `/apps` command lists available and installed apps. Connected apps appear first +and are labeled as connected; others are marked as can be installed. + ## Notify Codex can run a notification hook when the agent finishes a turn. See the configuration reference for the latest notification settings: