feat: Add One-Time Startup Remote Plugin Sync (#15264)

For early users who have already enabled apps, we should enable plugins
as part of the initial setup.
This commit is contained in:
xl-openai 2026-03-19 22:01:39 -07:00 committed by GitHub
parent cc192763e1
commit b1570d6c23
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 428 additions and 17 deletions

View file

@ -425,16 +425,16 @@ impl CodexMessageProcessor {
self.thread_manager.skills_manager().clear_cache();
}
pub(crate) async fn maybe_start_curated_repo_sync_for_latest_config(&self) {
pub(crate) async fn maybe_start_plugin_startup_tasks_for_latest_config(&self) {
match self.load_latest_config(/*fallback_cwd*/ None).await {
Ok(config) => self
.thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(
.maybe_start_plugin_startup_tasks_for_config(
&config,
self.thread_manager.auth_manager(),
),
Err(err) => warn!("failed to load latest config for curated plugin sync: {err:?}"),
Err(err) => warn!("failed to load latest config for plugin startup tasks: {err:?}"),
}
}
@ -5489,7 +5489,7 @@ impl CodexMessageProcessor {
if force_remote_sync {
match plugins_manager
.sync_plugins_from_remote(&config, auth.as_ref())
.sync_plugins_from_remote(&config, auth.as_ref(), /*additive_only*/ false)
.await
{
Ok(sync_result) => {

View file

@ -246,7 +246,7 @@ impl MessageProcessor {
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config, auth_manager.clone());
.maybe_start_plugin_startup_tasks_for_config(&config, auth_manager.clone());
let config_api = ConfigApi::new(
config.codex_home.clone(),
cli_overrides,
@ -790,7 +790,7 @@ impl MessageProcessor {
Ok(response) => {
self.codex_message_processor.clear_plugin_related_caches();
self.codex_message_processor
.maybe_start_curated_repo_sync_for_latest_config()
.maybe_start_plugin_startup_tasks_for_latest_config()
.await;
self.outgoing.send_response(request_id, response).await;
}
@ -807,7 +807,7 @@ impl MessageProcessor {
Ok(response) => {
self.codex_message_processor.clear_plugin_related_caches();
self.codex_message_processor
.maybe_start_curated_repo_sync_for_latest_config()
.maybe_start_plugin_startup_tasks_for_latest_config()
.await;
self.outgoing.send_response(request_id, response).await;
}

View file

@ -28,6 +28,7 @@ use wiremock::matchers::path;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567";
const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1";
fn write_plugins_enabled_config(codex_home: &std::path::Path) -> std::io::Result<()> {
std::fs::write(
@ -755,6 +756,91 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
Ok(())
}
#[tokio::test]
async fn app_server_startup_remote_plugin_sync_runs_once() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
write_plugin_sync_config(codex_home.path(), &format!("{}/backend-api/", server.uri()))?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
write_openai_curated_marketplace(codex_home.path(), &["linear"])?;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/list"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_string(
r#"[
{"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true}
]"#,
))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/featured"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#))
.mount(&server)
.await;
let marker_path = codex_home
.path()
.join(STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE);
{
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
wait_for_path_exists(&marker_path).await?;
wait_for_remote_plugin_request_count(&server, "/plugins/list", 1).await?;
let request_id = mcp
.send_plugin_list_request(PluginListParams {
cwds: None,
force_remote_sync: false,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginListResponse = to_response(response)?;
let curated_marketplace = response
.marketplaces
.into_iter()
.find(|marketplace| marketplace.name == "openai-curated")
.expect("expected openai-curated marketplace entry");
assert_eq!(
curated_marketplace
.plugins
.into_iter()
.map(|plugin| (plugin.id, plugin.installed, plugin.enabled))
.collect::<Vec<_>>(),
vec![("linear@openai-curated".to_string(), true, true)]
);
wait_for_remote_plugin_request_count(&server, "/plugins/list", 1).await?;
}
let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
assert!(config.contains(r#"[plugins."linear@openai-curated"]"#));
{
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
}
tokio::time::sleep(Duration::from_millis(250)).await;
wait_for_remote_plugin_request_count(&server, "/plugins/list", 1).await?;
Ok(())
}
#[tokio::test]
async fn plugin_list_fetches_featured_plugin_ids_without_chatgpt_auth() -> Result<()> {
let codex_home = TempDir::new()?;
@ -836,24 +922,32 @@ async fn plugin_list_uses_warmed_featured_plugin_ids_cache_on_first_request() ->
async fn wait_for_featured_plugin_request_count(
server: &MockServer,
expected_count: usize,
) -> Result<()> {
wait_for_remote_plugin_request_count(server, "/plugins/featured", expected_count).await
}
async fn wait_for_remote_plugin_request_count(
server: &MockServer,
path_suffix: &str,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
bail!("wiremock did not record requests");
};
let featured_request_count = requests
let request_count = requests
.iter()
.filter(|request| {
request.method == "GET" && request.url.path().ends_with("/plugins/featured")
request.method == "GET" && request.url.path().ends_with(path_suffix)
})
.count();
if featured_request_count == expected_count {
if request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if featured_request_count > expected_count {
if request_count > expected_count {
bail!(
"expected exactly {expected_count} /plugins/featured requests, got {featured_request_count}"
"expected exactly {expected_count} {path_suffix} requests, got {request_count}"
);
}
tokio::time::sleep(Duration::from_millis(10)).await;
@ -863,6 +957,19 @@ async fn wait_for_featured_plugin_request_count(
Ok(())
}
async fn wait_for_path_exists(path: &std::path::Path) -> Result<()> {
timeout(DEFAULT_TIMEOUT, async {
loop {
if path.exists() {
return Ok::<(), anyhow::Error>(());
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}
fn write_installed_plugin(
codex_home: &TempDir,
marketplace_name: &str,

View file

@ -18,6 +18,7 @@ use super::remote::enable_remote_plugin;
use super::remote::fetch_remote_featured_plugin_ids;
use super::remote::fetch_remote_plugin_status;
use super::remote::uninstall_remote_plugin;
use super::startup_sync::start_startup_remote_plugin_sync_once;
use super::store::DEFAULT_PLUGIN_VERSION;
use super::store::PluginId;
use super::store::PluginIdError;
@ -58,7 +59,6 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use toml_edit::value;
use tracing::info;
@ -70,7 +70,8 @@ const DEFAULT_APP_CONFIG_FILE: &str = ".app.json";
pub const OPENAI_CURATED_MARKETPLACE_NAME: &str = "openai-curated";
static CURATED_REPO_SYNC_STARTED: AtomicBool = AtomicBool::new(false);
const MAX_CAPABILITY_SUMMARY_DESCRIPTION_LEN: usize = 1024;
const FEATURED_PLUGIN_IDS_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 3);
const FEATURED_PLUGIN_IDS_CACHE_TTL: std::time::Duration =
std::time::Duration::from_secs(60 * 60 * 3);
#[derive(Clone, PartialEq, Eq)]
struct FeaturedPluginIdsCacheKey {
@ -774,6 +775,7 @@ impl PluginsManager {
&self,
config: &Config,
auth: Option<&CodexAuth>,
additive_only: bool,
) -> Result<RemotePluginSyncResult, PluginRemoteSyncError> {
if !config.features.enabled(Feature::Plugins) {
return Ok(RemotePluginSyncResult::default());
@ -913,7 +915,7 @@ impl PluginsManager {
value: value(true),
});
}
} else {
} else if !additive_only {
if is_installed {
uninstalls.push(plugin_id);
}
@ -1110,7 +1112,7 @@ impl PluginsManager {
})
}
pub fn maybe_start_curated_repo_sync_for_config(
pub fn maybe_start_plugin_startup_tasks_for_config(
self: &Arc<Self>,
config: &Config,
auth_manager: Arc<AuthManager>,
@ -1138,6 +1140,12 @@ impl PluginsManager {
.collect::<Vec<_>>();
configured_curated_plugin_ids.sort_unstable_by_key(super::store::PluginId::as_key);
self.start_curated_repo_sync(configured_curated_plugin_ids);
start_startup_remote_plugin_sync_once(
Arc::clone(self),
self.codex_home.clone(),
config.clone(),
auth_manager.clone(),
);
let config = config.clone();
let manager = Arc::clone(self);

View file

@ -1177,7 +1177,7 @@ plugins = false
let config = load_config(tmp.path(), tmp.path()).await;
let outcome = PluginsManager::new(tmp.path().to_path_buf())
.sync_plugins_from_remote(&config, None)
.sync_plugins_from_remote(&config, None, /*additive_only*/ false)
.await
.unwrap();
@ -1533,6 +1533,7 @@ enabled = true
.sync_plugins_from_remote(
&config,
Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()),
/*additive_only*/ false,
)
.await
.unwrap();
@ -1593,6 +1594,102 @@ enabled = true
);
}
#[tokio::test]
async fn sync_plugins_from_remote_additive_only_keeps_existing_plugins() {
let tmp = tempfile::tempdir().unwrap();
let curated_root = curated_plugins_repo_path(tmp.path());
write_openai_curated_marketplace(&curated_root, &["linear", "gmail", "calendar"]);
write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA);
write_plugin(
&tmp.path().join("plugins/cache/openai-curated"),
"linear/local",
"linear",
);
write_plugin(
&tmp.path().join("plugins/cache/openai-curated"),
"gmail/local",
"gmail",
);
write_plugin(
&tmp.path().join("plugins/cache/openai-curated"),
"calendar/local",
"calendar",
);
write_file(
&tmp.path().join(CONFIG_TOML_FILE),
r#"[features]
plugins = true
[plugins."linear@openai-curated"]
enabled = false
[plugins."gmail@openai-curated"]
enabled = false
[plugins."calendar@openai-curated"]
enabled = true
"#,
);
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/list"))
.and(header("authorization", "Bearer Access Token"))
.and(header("chatgpt-account-id", "account_id"))
.respond_with(ResponseTemplate::new(200).set_body_string(
r#"[
{"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true},
{"id":"2","name":"gmail","marketplace_name":"openai-curated","version":"1.0.0","enabled":false}
]"#,
))
.mount(&server)
.await;
let mut config = load_config(tmp.path(), tmp.path()).await;
config.chatgpt_base_url = format!("{}/backend-api/", server.uri());
let manager = PluginsManager::new(tmp.path().to_path_buf());
let result = manager
.sync_plugins_from_remote(
&config,
Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()),
/*additive_only*/ true,
)
.await
.unwrap();
assert_eq!(
result,
RemotePluginSyncResult {
installed_plugin_ids: Vec::new(),
enabled_plugin_ids: vec!["linear@openai-curated".to_string()],
disabled_plugin_ids: Vec::new(),
uninstalled_plugin_ids: Vec::new(),
}
);
assert!(
tmp.path()
.join("plugins/cache/openai-curated/linear/local")
.is_dir()
);
assert!(
tmp.path()
.join("plugins/cache/openai-curated/gmail/local")
.is_dir()
);
assert!(
tmp.path()
.join("plugins/cache/openai-curated/calendar/local")
.is_dir()
);
let config = fs::read_to_string(tmp.path().join(CONFIG_TOML_FILE)).unwrap();
assert!(config.contains(r#"[plugins."linear@openai-curated"]"#));
assert!(config.contains(r#"[plugins."gmail@openai-curated"]"#));
assert!(config.contains(r#"[plugins."calendar@openai-curated"]"#));
assert!(config.contains("enabled = true"));
}
#[tokio::test]
async fn sync_plugins_from_remote_ignores_unknown_remote_plugins() {
let tmp = tempfile::tempdir().unwrap();
@ -1627,6 +1724,7 @@ enabled = false
.sync_plugins_from_remote(
&config,
Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()),
/*additive_only*/ false,
)
.await
.unwrap();
@ -1689,6 +1787,7 @@ enabled = false
.sync_plugins_from_remote(
&config,
Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()),
/*additive_only*/ false,
)
.await
.unwrap_err();
@ -1777,6 +1876,7 @@ plugins = true
.sync_plugins_from_remote(
&config,
Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()),
/*additive_only*/ false,
)
.await
.unwrap();

View file

@ -6,6 +6,7 @@ mod manifest;
mod marketplace;
mod remote;
mod render;
mod startup_sync;
mod store;
#[cfg(test)]
pub(crate) mod test_support;

View file

@ -0,0 +1,195 @@
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tracing::info;
use tracing::warn;
use crate::AuthManager;
use crate::config::Config;
use super::PluginsManager;
const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1";
const STARTUP_REMOTE_PLUGIN_SYNC_PREREQUISITE_TIMEOUT: Duration = Duration::from_secs(5);
pub(super) fn start_startup_remote_plugin_sync_once(
manager: Arc<PluginsManager>,
codex_home: PathBuf,
config: Config,
auth_manager: Arc<AuthManager>,
) {
let marker_path = startup_remote_plugin_sync_marker_path(codex_home.as_path());
if marker_path.is_file() {
return;
}
tokio::spawn(async move {
if marker_path.is_file() {
return;
}
if !wait_for_startup_remote_plugin_sync_prerequisites(codex_home.as_path()).await {
warn!(
codex_home = %codex_home.display(),
"skipping startup remote plugin sync because curated marketplace is not ready"
);
return;
}
let auth = auth_manager.auth().await;
match manager
.sync_plugins_from_remote(&config, auth.as_ref(), /*additive_only*/ true)
.await
{
Ok(sync_result) => {
info!(
installed_plugin_ids = ?sync_result.installed_plugin_ids,
enabled_plugin_ids = ?sync_result.enabled_plugin_ids,
disabled_plugin_ids = ?sync_result.disabled_plugin_ids,
uninstalled_plugin_ids = ?sync_result.uninstalled_plugin_ids,
"completed startup remote plugin sync"
);
if let Err(err) =
write_startup_remote_plugin_sync_marker(codex_home.as_path()).await
{
warn!(
error = %err,
path = %marker_path.display(),
"failed to persist startup remote plugin sync marker"
);
}
}
Err(err) => {
warn!(
error = %err,
"startup remote plugin sync failed; will retry on next app-server start"
);
}
}
});
}
fn startup_remote_plugin_sync_marker_path(codex_home: &Path) -> PathBuf {
codex_home.join(STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE)
}
fn startup_remote_plugin_sync_prerequisites_ready(codex_home: &Path) -> bool {
codex_home
.join(".tmp/plugins/.agents/plugins/marketplace.json")
.is_file()
&& codex_home.join(".tmp/plugins.sha").is_file()
}
async fn wait_for_startup_remote_plugin_sync_prerequisites(codex_home: &Path) -> bool {
let deadline = tokio::time::Instant::now() + STARTUP_REMOTE_PLUGIN_SYNC_PREREQUISITE_TIMEOUT;
loop {
if startup_remote_plugin_sync_prerequisites_ready(codex_home) {
return true;
}
if tokio::time::Instant::now() >= deadline {
return false;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
async fn write_startup_remote_plugin_sync_marker(codex_home: &Path) -> std::io::Result<()> {
let marker_path = startup_remote_plugin_sync_marker_path(codex_home);
if let Some(parent) = marker_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
tokio::fs::write(marker_path, b"ok\n").await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::auth::CodexAuth;
use crate::config::CONFIG_TOML_FILE;
use crate::plugins::curated_plugins_repo_path;
use crate::plugins::test_support::TEST_CURATED_PLUGIN_SHA;
use crate::plugins::test_support::write_curated_plugin_sha;
use crate::plugins::test_support::write_file;
use crate::plugins::test_support::write_openai_curated_marketplace;
use pretty_assertions::assert_eq;
use tempfile::tempdir;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::header;
use wiremock::matchers::method;
use wiremock::matchers::path;
#[tokio::test]
async fn startup_remote_plugin_sync_writes_marker_and_reconciles_state() {
let tmp = tempdir().expect("tempdir");
let curated_root = curated_plugins_repo_path(tmp.path());
write_openai_curated_marketplace(&curated_root, &["linear"]);
write_curated_plugin_sha(tmp.path());
write_file(
&tmp.path().join(CONFIG_TOML_FILE),
r#"[features]
plugins = true
[plugins."linear@openai-curated"]
enabled = false
"#,
);
let server = MockServer::start().await;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/list"))
.and(header("authorization", "Bearer Access Token"))
.and(header("chatgpt-account-id", "account_id"))
.respond_with(ResponseTemplate::new(200).set_body_string(
r#"[
{"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true}
]"#,
))
.mount(&server)
.await;
let mut config = crate::plugins::test_support::load_plugins_config(tmp.path()).await;
config.chatgpt_base_url = format!("{}/backend-api/", server.uri());
let manager = Arc::new(PluginsManager::new(tmp.path().to_path_buf()));
let auth_manager =
AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing());
start_startup_remote_plugin_sync_once(
Arc::clone(&manager),
tmp.path().to_path_buf(),
config,
auth_manager,
);
let marker_path = tmp.path().join(STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE);
tokio::time::timeout(Duration::from_secs(5), async {
loop {
if marker_path.is_file() {
break;
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await
.expect("marker should be written");
assert!(
tmp.path()
.join(format!(
"plugins/cache/openai-curated/linear/{TEST_CURATED_PLUGIN_SHA}"
))
.is_dir()
);
let config = std::fs::read_to_string(tmp.path().join(CONFIG_TOML_FILE))
.expect("config should exist");
assert!(config.contains(r#"[plugins."linear@openai-curated"]"#));
assert!(config.contains("enabled = true"));
let marker_contents =
std::fs::read_to_string(marker_path).expect("marker should be readable");
assert_eq!(marker_contents, "ok\n");
}
}