Support featured plugins (#15042)

This commit is contained in:
alexsong-oai 2026-03-18 17:45:30 -07:00 committed by GitHub
parent 81996fcde6
commit 825d09373d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 385 additions and 22 deletions

View file

@ -9340,6 +9340,13 @@
"PluginListResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"featuredPluginIds": {
"default": [],
"items": {
"type": "string"
},
"type": "array"
},
"marketplaces": {
"items": {
"$ref": "#/definitions/v2/PluginMarketplaceEntry"

View file

@ -6128,6 +6128,13 @@
"PluginListResponse": {
"$schema": "http://json-schema.org/draft-07/schema#",
"properties": {
"featuredPluginIds": {
"default": [],
"items": {
"type": "string"
},
"type": "array"
},
"marketplaces": {
"items": {
"$ref": "#/definitions/PluginMarketplaceEntry"

View file

@ -239,6 +239,13 @@
}
},
"properties": {
"featuredPluginIds": {
"default": [],
"items": {
"type": "string"
},
"type": "array"
},
"marketplaces": {
"items": {
"$ref": "#/definitions/PluginMarketplaceEntry"

View file

@ -3,4 +3,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { PluginMarketplaceEntry } from "./PluginMarketplaceEntry";
export type PluginListResponse = { marketplaces: Array<PluginMarketplaceEntry>, remoteSyncError: string | null, };
export type PluginListResponse = { marketplaces: Array<PluginMarketplaceEntry>, remoteSyncError: string | null, featuredPluginIds: Array<string>, };

View file

@ -3093,6 +3093,8 @@ pub struct PluginListParams {
pub struct PluginListResponse {
pub marketplaces: Vec<PluginMarketplaceEntry>,
pub remote_sync_error: Option<String>,
#[serde(default)]
pub featured_plugin_ids: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)]

View file

@ -162,7 +162,7 @@ Example with notification opt-out:
- `experimentalFeature/list` — list feature flags with stage metadata (`beta`, `underDevelopment`, `stable`, etc.), enabled/default-enabled state, and cursor pagination. For non-beta flags, `displayName`/`description`/`announcement` are `null`.
- `collaborationMode/list` — list available collaboration mode presets (experimental, no pagination). This response omits built-in developer instructions; clients should either pass `settings.developer_instructions: null` when setting a mode to use Codex's built-in instructions, or provide their own instructions explicitly.
- `skills/list` — list skills for one or more `cwd` values (optional `forceReload`).
- `plugin/list` — list discovered plugin marketplaces and plugin state, including effective marketplace install/auth policy metadata. `interface.category` uses the marketplace category when present; otherwise it falls back to the plugin manifest category. Pass `forceRemoteSync: true` to refresh curated plugin state before listing (**under development; do not call from production clients yet**).
- `plugin/list` — list discovered plugin marketplaces and plugin state, including effective marketplace install/auth policy metadata and best-effort `featuredPluginIds` for the official curated marketplace. `interface.category` uses the marketplace category when present; otherwise it falls back to the plugin manifest category. Pass `forceRemoteSync: true` to refresh curated plugin state before listing (**under development; do not call from production clients yet**).
- `plugin/read` — read one plugin by `marketplacePath` plus `pluginName`, returning marketplace info, a list-style `summary`, manifest descriptions/interface metadata, and bundled skills/apps/MCP server names (**under development; do not call from production clients yet**).
- `skills/changed` — notification emitted when watched local skill files change.
- `app/list` — list available apps.

View file

@ -221,6 +221,7 @@ use codex_core::models_manager::collaboration_mode_presets::CollaborationModesCo
use codex_core::parse_cursor;
use codex_core::plugins::MarketplaceError;
use codex_core::plugins::MarketplacePluginSource;
use codex_core::plugins::OPENAI_CURATED_MARKETPLACE_NAME;
use codex_core::plugins::PluginInstallError as CorePluginInstallError;
use codex_core::plugins::PluginInstallRequest;
use codex_core::plugins::PluginReadRequest;
@ -424,7 +425,10 @@ impl CodexMessageProcessor {
Ok(config) => self
.thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config),
.maybe_start_curated_repo_sync_for_config(
&config,
self.thread_manager.auth_manager(),
),
Err(err) => warn!("failed to load latest config for curated plugin sync: {err:?}"),
}
}
@ -5409,9 +5413,9 @@ impl CodexMessageProcessor {
}
};
let mut remote_sync_error = None;
let auth = self.auth_manager.auth().await;
if force_remote_sync {
let auth = self.auth_manager.auth().await;
match plugins_manager
.sync_plugins_from_remote(&config, auth.as_ref())
.await
@ -5443,8 +5447,11 @@ impl CodexMessageProcessor {
};
}
let config_for_marketplace_listing = config.clone();
let plugins_manager_for_marketplace_listing = plugins_manager.clone();
let data = match tokio::task::spawn_blocking(move || {
let marketplaces = plugins_manager.list_marketplaces_for_config(&config, &roots)?;
let marketplaces = plugins_manager_for_marketplace_listing
.list_marketplaces_for_config(&config_for_marketplace_listing, &roots)?;
Ok::<Vec<PluginMarketplaceEntry>, MarketplaceError>(
marketplaces
.into_iter()
@ -5490,12 +5497,34 @@ impl CodexMessageProcessor {
}
};
let featured_plugin_ids = if data
.iter()
.any(|marketplace| marketplace.name == OPENAI_CURATED_MARKETPLACE_NAME)
{
match plugins_manager
.featured_plugin_ids_for_config(&config, auth.as_ref())
.await
{
Ok(featured_plugin_ids) => featured_plugin_ids,
Err(err) => {
warn!(
error = %err,
"plugin/list featured plugin fetch failed; returning empty featured ids"
);
Vec::new()
}
}
} else {
Vec::new()
};
self.outgoing
.send_response(
request_id,
PluginListResponse {
marketplaces: data,
remote_sync_error,
featured_plugin_ids,
},
)
.await;

View file

@ -228,10 +228,7 @@ impl MessageProcessor {
thread_manager
.plugins_manager()
.set_analytics_events_client(analytics_events_client.clone());
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config);
let cloud_requirements = Arc::new(RwLock::new(cloud_requirements));
let codex_message_processor = CodexMessageProcessor::new(CodexMessageProcessorArgs {
auth_manager: auth_manager.clone(),
@ -244,6 +241,11 @@ impl MessageProcessor {
feedback,
log_db,
});
// Keep plugin startup warmups aligned at app-server startup.
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config, auth_manager.clone());
let config_api = ConfigApi::new(
config.codex_home.clone(),
cli_overrides,

View file

@ -261,21 +261,22 @@ async fn plugin_install_tracks_analytics_event() -> Result<()> {
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
let payloads = timeout(DEFAULT_TIMEOUT, async {
let payload = timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = analytics_server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if !requests.is_empty() {
break requests;
if let Some(request) = requests.iter().find(|request| {
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
}) {
break request.body.clone();
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?;
let payload: serde_json::Value =
serde_json::from_slice(&payloads[0].body).expect("analytics payload");
let payload: serde_json::Value = serde_json::from_slice(&payload).expect("analytics payload");
assert_eq!(
payload,
json!({

View file

@ -1,6 +1,7 @@
use std::time::Duration;
use anyhow::Result;
use anyhow::bail;
use app_test_support::ChatGptAuthFixture;
use app_test_support::McpProcess;
use app_test_support::to_response;
@ -674,6 +675,16 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/featured"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(
ResponseTemplate::new(200)
.set_body_string(r#"["linear@openai-curated","calendar@openai-curated"]"#),
)
.mount(&server)
.await;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
@ -692,6 +703,13 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
.await??;
let response: PluginListResponse = to_response(response)?;
assert_eq!(response.remote_sync_error, None);
assert_eq!(
response.featured_plugin_ids,
vec![
"linear@openai-curated".to_string(),
"calendar@openai-curated".to_string(),
]
);
let curated_marketplace = response
.marketplaces
@ -737,6 +755,114 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
Ok(())
}
#[tokio::test]
async fn plugin_list_fetches_featured_plugin_ids_without_chatgpt_auth() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
write_plugin_sync_config(codex_home.path(), &format!("{}/backend-api/", server.uri()))?;
write_openai_curated_marketplace(codex_home.path(), &["linear", "gmail"])?;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/featured"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#))
.mount(&server)
.await;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
let request_id = mcp
.send_plugin_list_request(PluginListParams {
cwds: None,
force_remote_sync: false,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginListResponse = to_response(response)?;
assert_eq!(
response.featured_plugin_ids,
vec!["linear@openai-curated".to_string()]
);
assert_eq!(response.remote_sync_error, None);
Ok(())
}
#[tokio::test]
async fn plugin_list_uses_warmed_featured_plugin_ids_cache_on_first_request() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
write_plugin_sync_config(codex_home.path(), &format!("{}/backend-api/", server.uri()))?;
write_openai_curated_marketplace(codex_home.path(), &["linear", "gmail"])?;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/featured"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#))
.expect(1)
.mount(&server)
.await;
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
wait_for_featured_plugin_request_count(&server, 1).await?;
let request_id = mcp
.send_plugin_list_request(PluginListParams {
cwds: None,
force_remote_sync: false,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginListResponse = to_response(response)?;
assert_eq!(
response.featured_plugin_ids,
vec!["linear@openai-curated".to_string()]
);
assert_eq!(response.remote_sync_error, None);
Ok(())
}
async fn wait_for_featured_plugin_request_count(
server: &MockServer,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
bail!("wiremock did not record requests");
};
let featured_request_count = requests
.iter()
.filter(|request| {
request.method == "GET" && request.url.path().ends_with("/plugins/featured")
})
.count();
if featured_request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if featured_request_count > expected_count {
bail!(
"expected exactly {expected_count} /plugins/featured requests, got {featured_request_count}"
);
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
fn write_installed_plugin(
codex_home: &TempDir,
marketplace_name: &str,

View file

@ -183,21 +183,22 @@ async fn plugin_uninstall_tracks_analytics_event() -> Result<()> {
let response: PluginUninstallResponse = to_response(response)?;
assert_eq!(response, PluginUninstallResponse {});
let payloads = timeout(DEFAULT_TIMEOUT, async {
let payload = timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = analytics_server.received_requests().await else {
tokio::time::sleep(Duration::from_millis(25)).await;
continue;
};
if !requests.is_empty() {
break requests;
if let Some(request) = requests.iter().find(|request| {
request.method == "POST" && request.url.path() == "/codex/analytics-events/events"
}) {
break request.body.clone();
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
})
.await?;
let payload: serde_json::Value =
serde_json::from_slice(&payloads[0].body).expect("analytics payload");
let payload: serde_json::Value = serde_json::from_slice(&payload).expect("analytics payload");
assert_eq!(
payload,
json!({

View file

@ -15,6 +15,7 @@ use super::read_curated_plugins_sha;
use super::remote::RemotePluginFetchError;
use super::remote::RemotePluginMutationError;
use super::remote::enable_remote_plugin;
use super::remote::fetch_remote_featured_plugin_ids;
use super::remote::fetch_remote_plugin_status;
use super::remote::uninstall_remote_plugin;
use super::store::DEFAULT_PLUGIN_VERSION;
@ -24,6 +25,7 @@ use super::store::PluginInstallResult as StorePluginInstallResult;
use super::store::PluginStore;
use super::store::PluginStoreError;
use super::sync_openai_plugins_repo;
use crate::AuthManager;
use crate::analytics_client::AnalyticsEventsClient;
use crate::auth::CodexAuth;
use crate::config::Config;
@ -55,6 +57,8 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use toml_edit::value;
use tracing::info;
use tracing::warn;
@ -65,6 +69,44 @@ const DEFAULT_APP_CONFIG_FILE: &str = ".app.json";
pub const OPENAI_CURATED_MARKETPLACE_NAME: &str = "openai-curated";
static CURATED_REPO_SYNC_STARTED: AtomicBool = AtomicBool::new(false);
const MAX_CAPABILITY_SUMMARY_DESCRIPTION_LEN: usize = 1024;
const FEATURED_PLUGIN_IDS_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 3);
#[derive(Clone, PartialEq, Eq)]
struct FeaturedPluginIdsCacheKey {
chatgpt_base_url: String,
account_id: Option<String>,
chatgpt_user_id: Option<String>,
is_workspace_account: bool,
}
#[derive(Clone)]
struct CachedFeaturedPluginIds {
key: FeaturedPluginIdsCacheKey,
expires_at: Instant,
featured_plugin_ids: Vec<String>,
}
fn featured_plugin_ids_cache_key(
config: &Config,
auth: Option<&CodexAuth>,
) -> FeaturedPluginIdsCacheKey {
let token_data = auth.and_then(|auth| auth.get_token_data().ok());
let account_id = token_data
.as_ref()
.and_then(|token_data| token_data.account_id.clone());
let chatgpt_user_id = token_data
.as_ref()
.and_then(|token_data| token_data.id_token.chatgpt_user_id.clone());
let is_workspace_account = token_data
.as_ref()
.is_some_and(|token_data| token_data.id_token.is_workspace_account());
FeaturedPluginIdsCacheKey {
chatgpt_base_url: config.chatgpt_base_url.clone(),
account_id,
chatgpt_user_id,
is_workspace_account,
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AppConnectorId(pub String);
@ -417,6 +459,7 @@ impl From<RemotePluginFetchError> for PluginRemoteSyncError {
pub struct PluginsManager {
codex_home: PathBuf,
store: PluginStore,
featured_plugin_ids_cache: RwLock<Option<CachedFeaturedPluginIds>>,
cached_enabled_outcome: RwLock<Option<PluginLoadOutcome>>,
analytics_events_client: RwLock<Option<AnalyticsEventsClient>>,
}
@ -426,6 +469,7 @@ impl PluginsManager {
Self {
codex_home: codex_home.clone(),
store: PluginStore::new(codex_home),
featured_plugin_ids_cache: RwLock::new(None),
cached_enabled_outcome: RwLock::new(None),
analytics_events_client: RwLock::new(None),
}
@ -471,6 +515,11 @@ impl PluginsManager {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
let mut featured_plugin_ids_cache = match self.featured_plugin_ids_cache.write() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
*featured_plugin_ids_cache = None;
*cached_enabled_outcome = None;
}
@ -481,6 +530,72 @@ impl PluginsManager {
}
}
fn cached_featured_plugin_ids(
&self,
cache_key: &FeaturedPluginIdsCacheKey,
) -> Option<Vec<String>> {
{
let cache = match self.featured_plugin_ids_cache.read() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
let now = Instant::now();
if let Some(cached) = cache.as_ref()
&& now < cached.expires_at
&& cached.key == *cache_key
{
return Some(cached.featured_plugin_ids.clone());
}
}
let mut cache = match self.featured_plugin_ids_cache.write() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
let now = Instant::now();
if cache
.as_ref()
.is_some_and(|cached| now >= cached.expires_at || cached.key != *cache_key)
{
*cache = None;
}
None
}
fn write_featured_plugin_ids_cache(
&self,
cache_key: FeaturedPluginIdsCacheKey,
featured_plugin_ids: &[String],
) {
let mut cache = match self.featured_plugin_ids_cache.write() {
Ok(cache) => cache,
Err(err) => err.into_inner(),
};
*cache = Some(CachedFeaturedPluginIds {
key: cache_key,
expires_at: Instant::now() + FEATURED_PLUGIN_IDS_CACHE_TTL,
featured_plugin_ids: featured_plugin_ids.to_vec(),
});
}
pub async fn featured_plugin_ids_for_config(
&self,
config: &Config,
auth: Option<&CodexAuth>,
) -> Result<Vec<String>, RemotePluginFetchError> {
if !config.features.enabled(Feature::Plugins) {
return Ok(Vec::new());
}
let cache_key = featured_plugin_ids_cache_key(config, auth);
if let Some(featured_plugin_ids) = self.cached_featured_plugin_ids(&cache_key) {
return Ok(featured_plugin_ids);
}
let featured_plugin_ids = fetch_remote_featured_plugin_ids(config, auth).await?;
self.write_featured_plugin_ids_cache(cache_key, &featured_plugin_ids);
Ok(featured_plugin_ids)
}
pub async fn install_plugin(
&self,
request: PluginInstallRequest,
@ -935,7 +1050,11 @@ impl PluginsManager {
})
}
pub fn maybe_start_curated_repo_sync_for_config(self: &Arc<Self>, config: &Config) {
pub fn maybe_start_curated_repo_sync_for_config(
self: &Arc<Self>,
config: &Config,
auth_manager: Arc<AuthManager>,
) {
if config.features.enabled(Feature::Plugins) {
let mut configured_curated_plugin_ids =
configured_plugins_from_stack(&config.config_layer_stack)
@ -959,6 +1078,21 @@ impl PluginsManager {
.collect::<Vec<_>>();
configured_curated_plugin_ids.sort_unstable_by_key(super::store::PluginId::as_key);
self.start_curated_repo_sync(configured_curated_plugin_ids);
let config = config.clone();
let manager = Arc::clone(self);
tokio::spawn(async move {
let auth = auth_manager.auth().await;
if let Err(err) = manager
.featured_plugin_ids_for_config(&config, auth.as_ref())
.await
{
warn!(
error = %err,
"failed to warm featured plugin ids cache"
);
}
});
}
}

View file

@ -46,6 +46,8 @@ pub use marketplace::MarketplacePluginAuthPolicy;
pub use marketplace::MarketplacePluginInstallPolicy;
pub use marketplace::MarketplacePluginPolicy;
pub use marketplace::MarketplacePluginSource;
pub use remote::RemotePluginFetchError;
pub use remote::fetch_remote_featured_plugin_ids;
pub(crate) use render::render_explicit_plugin_instructions;
pub(crate) use render::render_plugins_section;
pub use store::PluginId;

View file

@ -7,6 +7,7 @@ use url::Url;
const DEFAULT_REMOTE_MARKETPLACE_NAME: &str = "openai-curated";
const REMOTE_PLUGIN_FETCH_TIMEOUT: Duration = Duration::from_secs(30);
const REMOTE_FEATURED_PLUGIN_FETCH_TIMEOUT: Duration = Duration::from_secs(10);
const REMOTE_PLUGIN_MUTATION_TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
@ -80,7 +81,7 @@ pub enum RemotePluginMutationError {
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum RemotePluginFetchError {
pub enum RemotePluginFetchError {
#[error("chatgpt authentication required to sync remote plugins")]
AuthRequired,
@ -158,6 +159,46 @@ pub(crate) async fn fetch_remote_plugin_status(
})
}
pub async fn fetch_remote_featured_plugin_ids(
config: &Config,
auth: Option<&CodexAuth>,
) -> Result<Vec<String>, RemotePluginFetchError> {
let base_url = config.chatgpt_base_url.trim_end_matches('/');
let url = format!("{base_url}/plugins/featured");
let client = build_reqwest_client();
let mut request = client
.get(&url)
.timeout(REMOTE_FEATURED_PLUGIN_FETCH_TIMEOUT);
if let Some(auth) = auth.filter(|auth| auth.is_chatgpt_auth()) {
let token = auth
.get_token()
.map_err(RemotePluginFetchError::AuthToken)?;
request = request.bearer_auth(token);
if let Some(account_id) = auth.get_account_id() {
request = request.header("chatgpt-account-id", account_id);
}
}
let response = request
.send()
.await
.map_err(|source| RemotePluginFetchError::Request {
url: url.clone(),
source,
})?;
let status = response.status();
let body = response.text().await.unwrap_or_default();
if !status.is_success() {
return Err(RemotePluginFetchError::UnexpectedStatus { url, status, body });
}
serde_json::from_str(&body).map_err(|source| RemotePluginFetchError::Decode {
url: url.clone(),
source,
})
}
pub(crate) async fn enable_remote_plugin(
config: &Config,
auth: Option<&CodexAuth>,

View file

@ -270,6 +270,10 @@ impl ThreadManager {
self.state.session_source.clone()
}
pub fn auth_manager(&self) -> Arc<AuthManager> {
self.state.auth_manager.clone()
}
pub fn skills_manager(&self) -> Arc<SkillsManager> {
self.state.skills_manager.clone()
}

View file

@ -2018,7 +2018,7 @@ impl App {
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config);
.maybe_start_curated_repo_sync_for_config(&config, auth_manager.clone());
let mut model = thread_manager
.get_models_manager()
.get_default_model(&config.model, RefreshStrategy::Offline)