diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 328827785..06e2cd3ec 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -425,16 +425,16 @@ impl CodexMessageProcessor { self.thread_manager.skills_manager().clear_cache(); } - pub(crate) async fn maybe_start_curated_repo_sync_for_latest_config(&self) { + pub(crate) async fn maybe_start_plugin_startup_tasks_for_latest_config(&self) { match self.load_latest_config(/*fallback_cwd*/ None).await { Ok(config) => self .thread_manager .plugins_manager() - .maybe_start_curated_repo_sync_for_config( + .maybe_start_plugin_startup_tasks_for_config( &config, self.thread_manager.auth_manager(), ), - Err(err) => warn!("failed to load latest config for curated plugin sync: {err:?}"), + Err(err) => warn!("failed to load latest config for plugin startup tasks: {err:?}"), } } @@ -5489,7 +5489,7 @@ impl CodexMessageProcessor { if force_remote_sync { match plugins_manager - .sync_plugins_from_remote(&config, auth.as_ref()) + .sync_plugins_from_remote(&config, auth.as_ref(), /*additive_only*/ false) .await { Ok(sync_result) => { diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 2dd682439..d70e8f47a 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -246,7 +246,7 @@ impl MessageProcessor { // TODO(xl): Move into PluginManager once this no longer depends on config feature gating. thread_manager .plugins_manager() - .maybe_start_curated_repo_sync_for_config(&config, auth_manager.clone()); + .maybe_start_plugin_startup_tasks_for_config(&config, auth_manager.clone()); let config_api = ConfigApi::new( config.codex_home.clone(), cli_overrides, @@ -790,7 +790,7 @@ impl MessageProcessor { Ok(response) => { self.codex_message_processor.clear_plugin_related_caches(); self.codex_message_processor - .maybe_start_curated_repo_sync_for_latest_config() + .maybe_start_plugin_startup_tasks_for_latest_config() .await; self.outgoing.send_response(request_id, response).await; } @@ -807,7 +807,7 @@ impl MessageProcessor { Ok(response) => { self.codex_message_processor.clear_plugin_related_caches(); self.codex_message_processor - .maybe_start_curated_repo_sync_for_latest_config() + .maybe_start_plugin_startup_tasks_for_latest_config() .await; self.outgoing.send_response(request_id, response).await; } diff --git a/codex-rs/app-server/tests/suite/v2/plugin_list.rs b/codex-rs/app-server/tests/suite/v2/plugin_list.rs index 17c772c94..a95871430 100644 --- a/codex-rs/app-server/tests/suite/v2/plugin_list.rs +++ b/codex-rs/app-server/tests/suite/v2/plugin_list.rs @@ -28,6 +28,7 @@ use wiremock::matchers::path; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567"; +const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1"; fn write_plugins_enabled_config(codex_home: &std::path::Path) -> std::io::Result<()> { std::fs::write( @@ -755,6 +756,91 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu Ok(()) } +#[tokio::test] +async fn app_server_startup_remote_plugin_sync_runs_once() -> Result<()> { + let codex_home = TempDir::new()?; + let server = MockServer::start().await; + write_plugin_sync_config(codex_home.path(), &format!("{}/backend-api/", server.uri()))?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + write_openai_curated_marketplace(codex_home.path(), &["linear"])?; + + Mock::given(method("GET")) + .and(path("/backend-api/plugins/list")) + .and(header("authorization", "Bearer chatgpt-token")) + .and(header("chatgpt-account-id", "account-123")) + .respond_with(ResponseTemplate::new(200).set_body_string( + r#"[ + {"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true} +]"#, + )) + .mount(&server) + .await; + Mock::given(method("GET")) + .and(path("/backend-api/plugins/featured")) + .and(header("authorization", "Bearer chatgpt-token")) + .and(header("chatgpt-account-id", "account-123")) + .respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#)) + .mount(&server) + .await; + + let marker_path = codex_home + .path() + .join(STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE); + + { + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + wait_for_path_exists(&marker_path).await?; + wait_for_remote_plugin_request_count(&server, "/plugins/list", 1).await?; + let request_id = mcp + .send_plugin_list_request(PluginListParams { + cwds: None, + force_remote_sync: false, + }) + .await?; + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let response: PluginListResponse = to_response(response)?; + let curated_marketplace = response + .marketplaces + .into_iter() + .find(|marketplace| marketplace.name == "openai-curated") + .expect("expected openai-curated marketplace entry"); + assert_eq!( + curated_marketplace + .plugins + .into_iter() + .map(|plugin| (plugin.id, plugin.installed, plugin.enabled)) + .collect::>(), + vec![("linear@openai-curated".to_string(), true, true)] + ); + wait_for_remote_plugin_request_count(&server, "/plugins/list", 1).await?; + } + + let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?; + assert!(config.contains(r#"[plugins."linear@openai-curated"]"#)); + + { + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + } + + tokio::time::sleep(Duration::from_millis(250)).await; + wait_for_remote_plugin_request_count(&server, "/plugins/list", 1).await?; + Ok(()) +} + #[tokio::test] async fn plugin_list_fetches_featured_plugin_ids_without_chatgpt_auth() -> Result<()> { let codex_home = TempDir::new()?; @@ -836,24 +922,32 @@ async fn plugin_list_uses_warmed_featured_plugin_ids_cache_on_first_request() -> async fn wait_for_featured_plugin_request_count( server: &MockServer, expected_count: usize, +) -> Result<()> { + wait_for_remote_plugin_request_count(server, "/plugins/featured", expected_count).await +} + +async fn wait_for_remote_plugin_request_count( + server: &MockServer, + path_suffix: &str, + expected_count: usize, ) -> Result<()> { timeout(DEFAULT_TIMEOUT, async { loop { let Some(requests) = server.received_requests().await else { bail!("wiremock did not record requests"); }; - let featured_request_count = requests + let request_count = requests .iter() .filter(|request| { - request.method == "GET" && request.url.path().ends_with("/plugins/featured") + request.method == "GET" && request.url.path().ends_with(path_suffix) }) .count(); - if featured_request_count == expected_count { + if request_count == expected_count { return Ok::<(), anyhow::Error>(()); } - if featured_request_count > expected_count { + if request_count > expected_count { bail!( - "expected exactly {expected_count} /plugins/featured requests, got {featured_request_count}" + "expected exactly {expected_count} {path_suffix} requests, got {request_count}" ); } tokio::time::sleep(Duration::from_millis(10)).await; @@ -863,6 +957,19 @@ async fn wait_for_featured_plugin_request_count( Ok(()) } +async fn wait_for_path_exists(path: &std::path::Path) -> Result<()> { + timeout(DEFAULT_TIMEOUT, async { + loop { + if path.exists() { + return Ok::<(), anyhow::Error>(()); + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await??; + Ok(()) +} + fn write_installed_plugin( codex_home: &TempDir, marketplace_name: &str, diff --git a/codex-rs/core/src/plugins/manager.rs b/codex-rs/core/src/plugins/manager.rs index 6f00bf5c7..5498d762c 100644 --- a/codex-rs/core/src/plugins/manager.rs +++ b/codex-rs/core/src/plugins/manager.rs @@ -18,6 +18,7 @@ use super::remote::enable_remote_plugin; use super::remote::fetch_remote_featured_plugin_ids; use super::remote::fetch_remote_plugin_status; use super::remote::uninstall_remote_plugin; +use super::startup_sync::start_startup_remote_plugin_sync_once; use super::store::DEFAULT_PLUGIN_VERSION; use super::store::PluginId; use super::store::PluginIdError; @@ -58,7 +59,6 @@ use std::sync::Arc; use std::sync::RwLock; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; -use std::time::Duration; use std::time::Instant; use toml_edit::value; use tracing::info; @@ -70,7 +70,8 @@ const DEFAULT_APP_CONFIG_FILE: &str = ".app.json"; pub const OPENAI_CURATED_MARKETPLACE_NAME: &str = "openai-curated"; static CURATED_REPO_SYNC_STARTED: AtomicBool = AtomicBool::new(false); const MAX_CAPABILITY_SUMMARY_DESCRIPTION_LEN: usize = 1024; -const FEATURED_PLUGIN_IDS_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 3); +const FEATURED_PLUGIN_IDS_CACHE_TTL: std::time::Duration = + std::time::Duration::from_secs(60 * 60 * 3); #[derive(Clone, PartialEq, Eq)] struct FeaturedPluginIdsCacheKey { @@ -774,6 +775,7 @@ impl PluginsManager { &self, config: &Config, auth: Option<&CodexAuth>, + additive_only: bool, ) -> Result { if !config.features.enabled(Feature::Plugins) { return Ok(RemotePluginSyncResult::default()); @@ -913,7 +915,7 @@ impl PluginsManager { value: value(true), }); } - } else { + } else if !additive_only { if is_installed { uninstalls.push(plugin_id); } @@ -1110,7 +1112,7 @@ impl PluginsManager { }) } - pub fn maybe_start_curated_repo_sync_for_config( + pub fn maybe_start_plugin_startup_tasks_for_config( self: &Arc, config: &Config, auth_manager: Arc, @@ -1138,6 +1140,12 @@ impl PluginsManager { .collect::>(); configured_curated_plugin_ids.sort_unstable_by_key(super::store::PluginId::as_key); self.start_curated_repo_sync(configured_curated_plugin_ids); + start_startup_remote_plugin_sync_once( + Arc::clone(self), + self.codex_home.clone(), + config.clone(), + auth_manager.clone(), + ); let config = config.clone(); let manager = Arc::clone(self); diff --git a/codex-rs/core/src/plugins/manager_tests.rs b/codex-rs/core/src/plugins/manager_tests.rs index 6f474c747..c44343380 100644 --- a/codex-rs/core/src/plugins/manager_tests.rs +++ b/codex-rs/core/src/plugins/manager_tests.rs @@ -1177,7 +1177,7 @@ plugins = false let config = load_config(tmp.path(), tmp.path()).await; let outcome = PluginsManager::new(tmp.path().to_path_buf()) - .sync_plugins_from_remote(&config, None) + .sync_plugins_from_remote(&config, None, /*additive_only*/ false) .await .unwrap(); @@ -1533,6 +1533,7 @@ enabled = true .sync_plugins_from_remote( &config, Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()), + /*additive_only*/ false, ) .await .unwrap(); @@ -1593,6 +1594,102 @@ enabled = true ); } +#[tokio::test] +async fn sync_plugins_from_remote_additive_only_keeps_existing_plugins() { + let tmp = tempfile::tempdir().unwrap(); + let curated_root = curated_plugins_repo_path(tmp.path()); + write_openai_curated_marketplace(&curated_root, &["linear", "gmail", "calendar"]); + write_curated_plugin_sha(tmp.path(), TEST_CURATED_PLUGIN_SHA); + write_plugin( + &tmp.path().join("plugins/cache/openai-curated"), + "linear/local", + "linear", + ); + write_plugin( + &tmp.path().join("plugins/cache/openai-curated"), + "gmail/local", + "gmail", + ); + write_plugin( + &tmp.path().join("plugins/cache/openai-curated"), + "calendar/local", + "calendar", + ); + write_file( + &tmp.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true + +[plugins."linear@openai-curated"] +enabled = false + +[plugins."gmail@openai-curated"] +enabled = false + +[plugins."calendar@openai-curated"] +enabled = true +"#, + ); + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/backend-api/plugins/list")) + .and(header("authorization", "Bearer Access Token")) + .and(header("chatgpt-account-id", "account_id")) + .respond_with(ResponseTemplate::new(200).set_body_string( + r#"[ + {"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true}, + {"id":"2","name":"gmail","marketplace_name":"openai-curated","version":"1.0.0","enabled":false} +]"#, + )) + .mount(&server) + .await; + + let mut config = load_config(tmp.path(), tmp.path()).await; + config.chatgpt_base_url = format!("{}/backend-api/", server.uri()); + let manager = PluginsManager::new(tmp.path().to_path_buf()); + let result = manager + .sync_plugins_from_remote( + &config, + Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()), + /*additive_only*/ true, + ) + .await + .unwrap(); + + assert_eq!( + result, + RemotePluginSyncResult { + installed_plugin_ids: Vec::new(), + enabled_plugin_ids: vec!["linear@openai-curated".to_string()], + disabled_plugin_ids: Vec::new(), + uninstalled_plugin_ids: Vec::new(), + } + ); + + assert!( + tmp.path() + .join("plugins/cache/openai-curated/linear/local") + .is_dir() + ); + assert!( + tmp.path() + .join("plugins/cache/openai-curated/gmail/local") + .is_dir() + ); + assert!( + tmp.path() + .join("plugins/cache/openai-curated/calendar/local") + .is_dir() + ); + + let config = fs::read_to_string(tmp.path().join(CONFIG_TOML_FILE)).unwrap(); + assert!(config.contains(r#"[plugins."linear@openai-curated"]"#)); + assert!(config.contains(r#"[plugins."gmail@openai-curated"]"#)); + assert!(config.contains(r#"[plugins."calendar@openai-curated"]"#)); + assert!(config.contains("enabled = true")); +} + #[tokio::test] async fn sync_plugins_from_remote_ignores_unknown_remote_plugins() { let tmp = tempfile::tempdir().unwrap(); @@ -1627,6 +1724,7 @@ enabled = false .sync_plugins_from_remote( &config, Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()), + /*additive_only*/ false, ) .await .unwrap(); @@ -1689,6 +1787,7 @@ enabled = false .sync_plugins_from_remote( &config, Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()), + /*additive_only*/ false, ) .await .unwrap_err(); @@ -1777,6 +1876,7 @@ plugins = true .sync_plugins_from_remote( &config, Some(&CodexAuth::create_dummy_chatgpt_auth_for_testing()), + /*additive_only*/ false, ) .await .unwrap(); diff --git a/codex-rs/core/src/plugins/mod.rs b/codex-rs/core/src/plugins/mod.rs index 895a633e6..ec338d191 100644 --- a/codex-rs/core/src/plugins/mod.rs +++ b/codex-rs/core/src/plugins/mod.rs @@ -6,6 +6,7 @@ mod manifest; mod marketplace; mod remote; mod render; +mod startup_sync; mod store; #[cfg(test)] pub(crate) mod test_support; diff --git a/codex-rs/core/src/plugins/startup_sync.rs b/codex-rs/core/src/plugins/startup_sync.rs new file mode 100644 index 000000000..b63cfbb09 --- /dev/null +++ b/codex-rs/core/src/plugins/startup_sync.rs @@ -0,0 +1,195 @@ +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Duration; + +use tracing::info; +use tracing::warn; + +use crate::AuthManager; +use crate::config::Config; + +use super::PluginsManager; + +const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1"; +const STARTUP_REMOTE_PLUGIN_SYNC_PREREQUISITE_TIMEOUT: Duration = Duration::from_secs(5); + +pub(super) fn start_startup_remote_plugin_sync_once( + manager: Arc, + codex_home: PathBuf, + config: Config, + auth_manager: Arc, +) { + let marker_path = startup_remote_plugin_sync_marker_path(codex_home.as_path()); + if marker_path.is_file() { + return; + } + + tokio::spawn(async move { + if marker_path.is_file() { + return; + } + + if !wait_for_startup_remote_plugin_sync_prerequisites(codex_home.as_path()).await { + warn!( + codex_home = %codex_home.display(), + "skipping startup remote plugin sync because curated marketplace is not ready" + ); + return; + } + + let auth = auth_manager.auth().await; + match manager + .sync_plugins_from_remote(&config, auth.as_ref(), /*additive_only*/ true) + .await + { + Ok(sync_result) => { + info!( + installed_plugin_ids = ?sync_result.installed_plugin_ids, + enabled_plugin_ids = ?sync_result.enabled_plugin_ids, + disabled_plugin_ids = ?sync_result.disabled_plugin_ids, + uninstalled_plugin_ids = ?sync_result.uninstalled_plugin_ids, + "completed startup remote plugin sync" + ); + if let Err(err) = + write_startup_remote_plugin_sync_marker(codex_home.as_path()).await + { + warn!( + error = %err, + path = %marker_path.display(), + "failed to persist startup remote plugin sync marker" + ); + } + } + Err(err) => { + warn!( + error = %err, + "startup remote plugin sync failed; will retry on next app-server start" + ); + } + } + }); +} + +fn startup_remote_plugin_sync_marker_path(codex_home: &Path) -> PathBuf { + codex_home.join(STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE) +} + +fn startup_remote_plugin_sync_prerequisites_ready(codex_home: &Path) -> bool { + codex_home + .join(".tmp/plugins/.agents/plugins/marketplace.json") + .is_file() + && codex_home.join(".tmp/plugins.sha").is_file() +} + +async fn wait_for_startup_remote_plugin_sync_prerequisites(codex_home: &Path) -> bool { + let deadline = tokio::time::Instant::now() + STARTUP_REMOTE_PLUGIN_SYNC_PREREQUISITE_TIMEOUT; + loop { + if startup_remote_plugin_sync_prerequisites_ready(codex_home) { + return true; + } + if tokio::time::Instant::now() >= deadline { + return false; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } +} + +async fn write_startup_remote_plugin_sync_marker(codex_home: &Path) -> std::io::Result<()> { + let marker_path = startup_remote_plugin_sync_marker_path(codex_home); + if let Some(parent) = marker_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + tokio::fs::write(marker_path, b"ok\n").await +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::auth::CodexAuth; + use crate::config::CONFIG_TOML_FILE; + use crate::plugins::curated_plugins_repo_path; + use crate::plugins::test_support::TEST_CURATED_PLUGIN_SHA; + use crate::plugins::test_support::write_curated_plugin_sha; + use crate::plugins::test_support::write_file; + use crate::plugins::test_support::write_openai_curated_marketplace; + use pretty_assertions::assert_eq; + use tempfile::tempdir; + use wiremock::Mock; + use wiremock::MockServer; + use wiremock::ResponseTemplate; + use wiremock::matchers::header; + use wiremock::matchers::method; + use wiremock::matchers::path; + + #[tokio::test] + async fn startup_remote_plugin_sync_writes_marker_and_reconciles_state() { + let tmp = tempdir().expect("tempdir"); + let curated_root = curated_plugins_repo_path(tmp.path()); + write_openai_curated_marketplace(&curated_root, &["linear"]); + write_curated_plugin_sha(tmp.path()); + write_file( + &tmp.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true + +[plugins."linear@openai-curated"] +enabled = false +"#, + ); + + let server = MockServer::start().await; + Mock::given(method("GET")) + .and(path("/backend-api/plugins/list")) + .and(header("authorization", "Bearer Access Token")) + .and(header("chatgpt-account-id", "account_id")) + .respond_with(ResponseTemplate::new(200).set_body_string( + r#"[ + {"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true} +]"#, + )) + .mount(&server) + .await; + + let mut config = crate::plugins::test_support::load_plugins_config(tmp.path()).await; + config.chatgpt_base_url = format!("{}/backend-api/", server.uri()); + let manager = Arc::new(PluginsManager::new(tmp.path().to_path_buf())); + let auth_manager = + AuthManager::from_auth_for_testing(CodexAuth::create_dummy_chatgpt_auth_for_testing()); + + start_startup_remote_plugin_sync_once( + Arc::clone(&manager), + tmp.path().to_path_buf(), + config, + auth_manager, + ); + + let marker_path = tmp.path().join(STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE); + tokio::time::timeout(Duration::from_secs(5), async { + loop { + if marker_path.is_file() { + break; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .expect("marker should be written"); + + assert!( + tmp.path() + .join(format!( + "plugins/cache/openai-curated/linear/{TEST_CURATED_PLUGIN_SHA}" + )) + .is_dir() + ); + let config = std::fs::read_to_string(tmp.path().join(CONFIG_TOML_FILE)) + .expect("config should exist"); + assert!(config.contains(r#"[plugins."linear@openai-curated"]"#)); + assert!(config.contains("enabled = true")); + + let marker_contents = + std::fs::read_to_string(marker_path).expect("marker should be readable"); + assert_eq!(marker_contents, "ok\n"); + } +}