diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 2797dc366..a36ebc6a0 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -42,6 +42,13 @@ "integer", "null" ] + }, + "threadId": { + "description": "Optional thread id used to evaluate app feature gating from that thread's config.", + "type": [ + "string", + "null" + ] } }, "type": "object" diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index cdb6407b5..f6b072e7d 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -10083,6 +10083,13 @@ "integer", "null" ] + }, + "threadId": { + "description": "Optional thread id used to evaluate app feature gating from that thread's config.", + "type": [ + "string", + "null" + ] } }, "title": "AppsListParams", diff --git a/codex-rs/app-server-protocol/schema/json/v2/AppsListParams.json b/codex-rs/app-server-protocol/schema/json/v2/AppsListParams.json index 0369fec38..385e5ba29 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/AppsListParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/AppsListParams.json @@ -21,6 +21,13 @@ "integer", "null" ] + }, + "threadId": { + "description": "Optional thread id used to evaluate app feature gating from that thread's config.", + "type": [ + "string", + "null" + ] } }, "title": "AppsListParams", diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/AppsListParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/AppsListParams.ts index 3bd769756..7687144c5 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/AppsListParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/AppsListParams.ts @@ -14,6 +14,10 @@ cursor?: string | null, * Optional page size; defaults to a reasonable server-side value. */ limit?: number | null, +/** + * Optional thread id used to evaluate app feature gating from that thread's config. + */ +threadId?: string | null, /** * When true, bypass app caches and fetch the latest data from sources. */ diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index bc7738924..77d22f4ab 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -1107,6 +1107,27 @@ mod tests { Ok(()) } + #[test] + fn serialize_list_apps() -> Result<()> { + let request = ClientRequest::AppsList { + request_id: RequestId::Integer(8), + params: v2::AppsListParams::default(), + }; + assert_eq!( + json!({ + "method": "app/list", + "id": 8, + "params": { + "cursor": null, + "limit": null, + "threadId": null + } + }), + serde_json::to_value(&request)?, + ); + Ok(()) + } + #[test] fn serialize_list_experimental_features() -> Result<()> { let request = ClientRequest::ExperimentalFeatureList { diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index c3cfd6335..679dc4ab2 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -1201,6 +1201,9 @@ pub struct AppsListParams { /// Optional page size; defaults to a reasonable server-side value. #[ts(optional = nullable)] pub limit: Option, + /// Optional thread id used to evaluate app feature gating from that thread's config. + #[ts(optional = nullable)] + pub thread_id: Option, /// When true, bypass app caches and fetch the latest data from sources. #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub force_refetch: bool, diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 04a254a60..66d4a501e 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -722,6 +722,7 @@ Use `app/list` to fetch available apps (connectors). Each entry includes metadat { "method": "app/list", "id": 50, "params": { "cursor": null, "limit": 50, + "threadId": "thr_123", "forceRefetch": false } } { "id": 50, "result": { @@ -741,6 +742,8 @@ Use `app/list` to fetch available apps (connectors). Each entry includes metadat } } ``` +When `threadId` is provided, app feature gating (`Feature::Apps`) is evaluated using that thread's config snapshot. When omitted, the latest global config is used. + `app/list` returns after both accessible apps and directory apps are loaded. Set `forceRefetch: true` to bypass app caches and fetch fresh data from sources. Cache entries are only replaced when those refetches succeed. The server also emits `app/list/updated` notifications whenever either source (accessible apps or directory apps) finishes loading. Each notification includes the latest merged app list. diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 3dfabc3c0..76656088a 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -4368,7 +4368,7 @@ impl CodexMessageProcessor { } async fn apps_list(&self, request_id: ConnectionRequestId, params: AppsListParams) { - let config = match self.load_latest_config().await { + let mut config = match self.load_latest_config().await { Ok(config) => config, Err(error) => { self.outgoing.send_error(request_id, error).await; @@ -4376,6 +4376,22 @@ impl CodexMessageProcessor { } }; + if let Some(thread_id) = params.thread_id.as_deref() { + let (_, thread) = match self.load_thread(thread_id).await { + Ok(result) => result, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + if thread.enabled(Feature::Apps) { + config.features.enable(Feature::Apps); + } else { + config.features.disable(Feature::Apps); + } + } + if !config.features.enabled(Feature::Apps) { self.outgoing .send_response( @@ -4405,6 +4421,7 @@ impl CodexMessageProcessor { let AppsListParams { cursor, limit, + thread_id: _, force_refetch, } = params; let start = match cursor { diff --git a/codex-rs/app-server/tests/suite/v2/app_list.rs b/codex-rs/app-server/tests/suite/v2/app_list.rs index 0a5d90af2..9290bae29 100644 --- a/codex-rs/app-server/tests/suite/v2/app_list.rs +++ b/codex-rs/app-server/tests/suite/v2/app_list.rs @@ -23,6 +23,8 @@ use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::ThreadStartParams; +use codex_app_server_protocol::ThreadStartResponse; use codex_core::auth::AuthCredentialsStoreMode; use pretty_assertions::assert_eq; use rmcp::handler::server::ServerHandler; @@ -55,6 +57,7 @@ async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> { .send_apps_list_request(AppsListParams { limit: Some(50), cursor: None, + thread_id: None, force_refetch: false, }) .await?; @@ -72,6 +75,103 @@ async fn list_apps_returns_empty_when_connectors_disabled() -> Result<()> { Ok(()) } +#[tokio::test] +async fn list_apps_uses_thread_feature_flag_when_thread_id_is_provided() -> Result<()> { + let connectors = vec![AppInfo { + id: "beta".to_string(), + name: "Beta".to_string(), + description: Some("Beta connector".to_string()), + logo_url: None, + logo_url_dark: None, + distribution_channel: None, + install_url: None, + is_accessible: false, + }]; + let tools = vec![connector_tool("beta", "Beta App")?]; + let (server_url, server_handle) = + start_apps_server_with_delays(connectors, tools, Duration::ZERO, Duration::ZERO).await?; + + let codex_home = TempDir::new()?; + write_connectors_config(codex_home.path(), &server_url)?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let start_request = mcp + .send_thread_start_request(ThreadStartParams::default()) + .await?; + let start_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(start_request)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response(start_response)?; + + std::fs::write( + codex_home.path().join("config.toml"), + format!( + r#" +chatgpt_base_url = "{server_url}" + +[features] +connectors = false +"# + ), + )?; + + let global_request = mcp + .send_apps_list_request(AppsListParams { + limit: None, + cursor: None, + thread_id: None, + force_refetch: false, + }) + .await?; + let global_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(global_request)), + ) + .await??; + let AppsListResponse { + data: global_data, + next_cursor: global_next_cursor, + } = to_response(global_response)?; + assert!(global_data.is_empty()); + assert!(global_next_cursor.is_none()); + + let thread_request = mcp + .send_apps_list_request(AppsListParams { + limit: None, + cursor: None, + thread_id: Some(thread.id), + force_refetch: false, + }) + .await?; + let thread_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_request)), + ) + .await??; + let AppsListResponse { + data: thread_data, + next_cursor: thread_next_cursor, + } = to_response(thread_response)?; + assert!(thread_data.iter().any(|app| app.id == "beta")); + assert!(thread_next_cursor.is_none()); + + server_handle.abort(); + let _ = server_handle.await; + Ok(()) +} + #[tokio::test] async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<()> { let connectors = vec![ @@ -124,6 +224,7 @@ async fn list_apps_emits_updates_and_returns_after_both_lists_load() -> Result<( .send_apps_list_request(AppsListParams { limit: None, cursor: None, + thread_id: None, force_refetch: false, }) .await?; @@ -238,6 +339,7 @@ async fn list_apps_returns_connectors_with_accessible_flags() -> Result<()> { .send_apps_list_request(AppsListParams { limit: None, cursor: None, + thread_id: None, force_refetch: false, }) .await?; @@ -360,6 +462,7 @@ async fn list_apps_paginates_results() -> Result<()> { .send_apps_list_request(AppsListParams { limit: Some(1), cursor: None, + thread_id: None, force_refetch: false, }) .await?; @@ -398,6 +501,7 @@ async fn list_apps_paginates_results() -> Result<()> { .send_apps_list_request(AppsListParams { limit: Some(1), cursor: Some(next_cursor), + thread_id: None, force_refetch: false, }) .await?; @@ -463,6 +567,7 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result .send_apps_list_request(AppsListParams { limit: None, cursor: None, + thread_id: None, force_refetch: false, }) .await?; @@ -492,6 +597,7 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result .send_apps_list_request(AppsListParams { limit: None, cursor: None, + thread_id: None, force_refetch: true, }) .await?; @@ -506,6 +612,7 @@ async fn list_apps_force_refetch_preserves_previous_cache_on_failure() -> Result .send_apps_list_request(AppsListParams { limit: None, cursor: None, + thread_id: None, force_refetch: false, }) .await?; diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 9ac448ccc..5eed41e77 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -488,6 +488,10 @@ impl Codex { pub(crate) fn state_db(&self) -> Option { self.session.state_db() } + + pub(crate) fn enabled(&self, feature: Feature) -> bool { + self.session.enabled(feature) + } } /// Context for an initialized model agent diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 0c0bbe0e0..88a052615 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -2,6 +2,7 @@ use crate::agent::AgentStatus; use crate::codex::Codex; use crate::codex::SteerInputError; use crate::error::Result as CodexResult; +use crate::features::Feature; use crate::protocol::Event; use crate::protocol::Op; use crate::protocol::Submission; @@ -83,4 +84,8 @@ impl CodexThread { pub async fn config_snapshot(&self) -> ThreadConfigSnapshot { self.codex.thread_config_snapshot().await } + + pub fn enabled(&self, feature: Feature) -> bool { + self.codex.enabled(feature) + } }