From e2fef7a3d24c8d2b16cee652562c33a6ca95ab8a Mon Sep 17 00:00:00 2001 From: alexsong-oai Date: Fri, 27 Feb 2026 18:22:05 -0800 Subject: [PATCH] Make cloud_requirements fail close (#13063) Make it fail-close only for CLI for now Will extend this for app-server later --- codex-rs/cloud-requirements/src/lib.rs | 135 +++++++++++++++------- codex-rs/config/src/cloud_requirements.rs | 30 ++++- codex-rs/config/src/lib.rs | 1 + codex-rs/core/src/config/mod.rs | 22 ++-- codex-rs/core/src/config_loader/mod.rs | 3 +- codex-rs/core/src/config_loader/tests.rs | 32 ++++- 6 files changed, 158 insertions(+), 65 deletions(-) diff --git a/codex-rs/cloud-requirements/src/lib.rs b/codex-rs/cloud-requirements/src/lib.rs index d89ff5352..2b51a9e9e 100644 --- a/codex-rs/cloud-requirements/src/lib.rs +++ b/codex-rs/cloud-requirements/src/lib.rs @@ -4,9 +4,9 @@ //! from the local filesystem. It only applies to Business (aka Enterprise CBP) or Enterprise ChatGPT //! customers. //! -//! Today, fetching is best-effort: on error or timeout, Codex continues without cloud requirements. -//! We expect to tighten this so that Enterprise ChatGPT customers must successfully fetch these -//! requirements before Codex will run. +//! Fetching fails closed for eligible ChatGPT Business and Enterprise accounts. When cloud +//! requirements cannot be loaded for those accounts, Codex fails configuration loading rather than +//! continuing without them. use async_trait::async_trait; use base64::Engine; @@ -17,6 +17,7 @@ use chrono::Utc; use codex_backend_client::Client as BackendClient; use codex_core::AuthManager; use codex_core::auth::CodexAuth; +use codex_core::config_loader::CloudRequirementsLoadError; use codex_core::config_loader::CloudRequirementsLoader; use codex_core::config_loader::ConfigRequirementsToml; use codex_core::util::backoff; @@ -220,16 +221,34 @@ impl CloudRequirementsService { } } - async fn fetch_with_timeout(&self) -> Option { + async fn fetch_with_timeout( + &self, + ) -> Result, CloudRequirementsLoadError> { let _timer = codex_otel::start_global_timer("codex.cloud_requirements.fetch.duration_ms", &[]); let started_at = Instant::now(); let result = timeout(self.timeout, self.fetch()) .await .inspect_err(|_| { - tracing::warn!("Timed out waiting for cloud requirements; continuing without them"); + let message = format!( + "Timed out waiting for cloud requirements after {}s", + self.timeout.as_secs() + ); + tracing::error!("{message}"); + if let Some(metrics) = codex_otel::metrics::global() { + let _ = metrics.counter( + "codex.cloud_requirements.load_failure", + 1, + &[("trigger", "startup")], + ); + } }) - .ok()?; + .map_err(|_| { + CloudRequirementsLoadError::new(format!( + "timed out waiting for cloud requirements after {}s", + self.timeout.as_secs() + )) + })??; match result.as_ref() { Some(requirements) => { @@ -247,18 +266,20 @@ impl CloudRequirementsService { } } - result + Ok(result) } - async fn fetch(&self) -> Option { - let auth = self.auth_manager.auth().await?; + async fn fetch(&self) -> Result, CloudRequirementsLoadError> { + let Some(auth) = self.auth_manager.auth().await else { + return Ok(None); + }; if !auth.is_chatgpt_auth() || !matches!( auth.account_plan_type(), Some(PlanType::Business | PlanType::Enterprise) ) { - return None; + return Ok(None); } let token_data = auth.get_token_data().ok(); let chatgpt_user_id = token_data @@ -273,7 +294,7 @@ impl CloudRequirementsService { path = %self.cache_path.display(), "Using cached cloud requirements" ); - return signed_payload.requirements(); + return Ok(signed_payload.requirements()); } Err(cache_load_status) => { self.log_cache_load_status(&cache_load_status); @@ -281,7 +302,15 @@ impl CloudRequirementsService { } self.fetch_with_retries(&auth, chatgpt_user_id, account_id) - .await? + .await + .ok_or_else(|| { + let message = "failed to load your workspace-managed config"; + tracing::error!( + path = %self.cache_path.display(), + "{message}" + ); + CloudRequirementsLoadError::new(message) + }) } async fn fetch_with_retries( @@ -311,7 +340,7 @@ impl CloudRequirementsService { Some(contents) => match parse_cloud_requirements(contents) { Ok(requirements) => requirements, Err(err) => { - tracing::warn!(error = %err, "Failed to parse cloud requirements"); + tracing::error!(error = %err, "Failed to parse cloud requirements"); return None; } }, @@ -342,7 +371,7 @@ impl CloudRequirementsService { Ok(true) => {} Ok(false) => break, Err(_) => { - tracing::warn!( + tracing::error!( "Timed out refreshing cloud requirements cache from remote; keeping existing cache" ); } @@ -375,10 +404,17 @@ impl CloudRequirementsService { .await .is_none() { - tracing::warn!( + tracing::error!( path = %self.cache_path.display(), "Failed to refresh cloud requirements cache from remote" ); + if let Some(metrics) = codex_otel::metrics::global() { + let _ = metrics.counter( + "codex.cloud_requirements.load_failure", + 1, + &[("trigger", "refresh")], + ); + } } true } @@ -522,10 +558,10 @@ pub fn cloud_requirements_loader( existing_task.abort(); } CloudRequirementsLoader::new(async move { - task.await - .inspect_err(|err| tracing::warn!(error = %err, "Cloud requirements task failed")) - .ok() - .flatten() + task.await.map_err(|err| { + tracing::error!(error = %err, "Cloud requirements task failed"); + CloudRequirementsLoadError::new(format!("cloud requirements load failed: {err}")) + })? }) } @@ -692,7 +728,7 @@ mod tests { CLOUD_REQUIREMENTS_TIMEOUT, ); let result = service.fetch().await; - assert!(result.is_none()); + assert_eq!(result, Ok(None)); } #[tokio::test] @@ -705,7 +741,7 @@ mod tests { CLOUD_REQUIREMENTS_TIMEOUT, ); let result = service.fetch().await; - assert!(result.is_none()); + assert_eq!(result, Ok(None)); } #[tokio::test] @@ -721,7 +757,7 @@ mod tests { ); assert_eq!( service.fetch().await, - Some(ConfigRequirementsToml { + Ok(Some(ConfigRequirementsToml { allowed_approval_policies: Some(vec![AskForApproval::Never]), allowed_sandbox_modes: None, allowed_web_search_modes: None, @@ -729,7 +765,7 @@ mod tests { rules: None, enforce_residency: None, network: None, - }) + })) ); } @@ -789,7 +825,11 @@ mod tests { tokio::time::advance(CLOUD_REQUIREMENTS_TIMEOUT + Duration::from_millis(1)).await; let result = handle.await.expect("cloud requirements task"); - assert!(result.is_none()); + let err = result.expect_err("cloud requirements timeout should fail closed"); + assert!( + err.to_string() + .contains("timed out waiting for cloud requirements") + ); } #[tokio::test(start_paused = true)] @@ -812,7 +852,7 @@ mod tests { assert_eq!( handle.await.expect("cloud requirements task"), - Some(ConfigRequirementsToml { + Ok(Some(ConfigRequirementsToml { allowed_approval_policies: Some(vec![AskForApproval::Never]), allowed_sandbox_modes: None, allowed_web_search_modes: None, @@ -820,7 +860,7 @@ mod tests { rules: None, enforce_residency: None, network: None, - }) + })) ); assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 2); } @@ -839,7 +879,7 @@ mod tests { CLOUD_REQUIREMENTS_TIMEOUT, ); - assert!(service.fetch().await.is_none()); + assert!(service.fetch().await.is_err()); assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1); } @@ -868,7 +908,7 @@ mod tests { assert_eq!( service.fetch().await, - Some(ConfigRequirementsToml { + Ok(Some(ConfigRequirementsToml { allowed_approval_policies: Some(vec![AskForApproval::Never]), allowed_sandbox_modes: None, allowed_web_search_modes: None, @@ -876,7 +916,7 @@ mod tests { rules: None, enforce_residency: None, network: None, - }) + })) ); assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 0); } @@ -895,7 +935,7 @@ mod tests { assert_eq!( service.fetch().await, - Some(ConfigRequirementsToml { + Ok(Some(ConfigRequirementsToml { allowed_approval_policies: Some(vec![AskForApproval::Never]), allowed_sandbox_modes: None, allowed_web_search_modes: None, @@ -903,7 +943,7 @@ mod tests { rules: None, enforce_residency: None, network: None, - }) + })) ); let path = codex_home.path().join(CLOUD_REQUIREMENTS_CACHE_FILENAME); @@ -942,7 +982,7 @@ mod tests { assert_eq!( service.fetch().await, - Some(ConfigRequirementsToml { + Ok(Some(ConfigRequirementsToml { allowed_approval_policies: Some(vec![AskForApproval::OnRequest]), allowed_sandbox_modes: None, allowed_web_search_modes: None, @@ -950,7 +990,7 @@ mod tests { rules: None, enforce_residency: None, network: None, - }) + })) ); assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1); } @@ -988,7 +1028,7 @@ mod tests { assert_eq!( service.fetch().await, - Some(ConfigRequirementsToml { + Ok(Some(ConfigRequirementsToml { allowed_approval_policies: Some(vec![AskForApproval::OnRequest]), allowed_sandbox_modes: None, allowed_web_search_modes: None, @@ -996,7 +1036,7 @@ mod tests { rules: None, enforce_residency: None, network: None, - }) + })) ); assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1); } @@ -1038,7 +1078,7 @@ mod tests { assert_eq!( service.fetch().await, - Some(ConfigRequirementsToml { + Ok(Some(ConfigRequirementsToml { allowed_approval_policies: Some(vec![AskForApproval::Never]), allowed_sandbox_modes: None, allowed_web_search_modes: None, @@ -1046,7 +1086,7 @@ mod tests { rules: None, enforce_residency: None, network: None, - }) + })) ); assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1); } @@ -1089,7 +1129,7 @@ mod tests { assert_eq!( service.fetch().await, - Some(ConfigRequirementsToml { + Ok(Some(ConfigRequirementsToml { allowed_approval_policies: Some(vec![AskForApproval::Never]), allowed_sandbox_modes: None, allowed_web_search_modes: None, @@ -1097,7 +1137,7 @@ mod tests { rules: None, enforce_residency: None, network: None, - }) + })) ); assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1); } @@ -1171,7 +1211,7 @@ mod tests { CLOUD_REQUIREMENTS_TIMEOUT, ); - assert!(service.fetch().await.is_none()); + assert_eq!(service.fetch().await, Ok(None)); assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1); } @@ -1196,7 +1236,14 @@ mod tests { tokio::time::advance(Duration::from_secs(5)).await; tokio::task::yield_now().await; - assert!(handle.await.expect("cloud requirements task").is_none()); + let err = handle + .await + .expect("cloud requirements task") + .expect_err("cloud requirements retry exhaustion should fail closed"); + assert_eq!( + err.to_string(), + "failed to load your workspace-managed config" + ); assert_eq!( fetcher.request_count.load(Ordering::SeqCst), CLOUD_REQUIREMENTS_MAX_ATTEMPTS @@ -1221,7 +1268,7 @@ mod tests { assert_eq!( service.fetch().await, - Some(ConfigRequirementsToml { + Ok(Some(ConfigRequirementsToml { allowed_approval_policies: Some(vec![AskForApproval::Never]), allowed_sandbox_modes: None, allowed_web_search_modes: None, @@ -1229,10 +1276,10 @@ mod tests { rules: None, enforce_residency: None, network: None, - }) + })) ); - service.refresh_cache().await; + assert!(service.refresh_cache().await); let path = codex_home.path().join(CLOUD_REQUIREMENTS_CACHE_FILENAME); let cache_file: CloudRequirementsCacheFile = diff --git a/codex-rs/config/src/cloud_requirements.rs b/codex-rs/config/src/cloud_requirements.rs index cfff1824f..1cf58563b 100644 --- a/codex-rs/config/src/cloud_requirements.rs +++ b/codex-rs/config/src/cloud_requirements.rs @@ -4,24 +4,42 @@ use futures::future::FutureExt; use futures::future::Shared; use std::fmt; use std::future::Future; +use thiserror::Error; + +#[derive(Clone, Debug, Eq, Error, PartialEq)] +#[error("{message}")] +pub struct CloudRequirementsLoadError { + message: String, +} + +impl CloudRequirementsLoadError { + pub fn new(message: impl Into) -> Self { + Self { + message: message.into(), + } + } +} #[derive(Clone)] pub struct CloudRequirementsLoader { - // TODO(gt): This should return a Result once we can fail-closed. - fut: Shared>>, + fut: Shared< + BoxFuture<'static, Result, CloudRequirementsLoadError>>, + >, } impl CloudRequirementsLoader { pub fn new(fut: F) -> Self where - F: Future> + Send + 'static, + F: Future, CloudRequirementsLoadError>> + + Send + + 'static, { Self { fut: fut.boxed().shared(), } } - pub async fn get(&self) -> Option { + pub async fn get(&self) -> Result, CloudRequirementsLoadError> { self.fut.clone().await } } @@ -34,7 +52,7 @@ impl fmt::Debug for CloudRequirementsLoader { impl Default for CloudRequirementsLoader { fn default() -> Self { - Self::new(async { None }) + Self::new(async { Ok(None) }) } } @@ -52,7 +70,7 @@ mod tests { let counter_clone = Arc::clone(&counter); let loader = CloudRequirementsLoader::new(async move { counter_clone.fetch_add(1, Ordering::SeqCst); - Some(ConfigRequirementsToml::default()) + Ok(Some(ConfigRequirementsToml::default())) }); let (first, second) = tokio::join!(loader.get(), loader.get()); diff --git a/codex-rs/config/src/lib.rs b/codex-rs/config/src/lib.rs index 41b9a3ae0..b85e99133 100644 --- a/codex-rs/config/src/lib.rs +++ b/codex-rs/config/src/lib.rs @@ -10,6 +10,7 @@ mod state; pub const CONFIG_TOML_FILE: &str = "config.toml"; +pub use cloud_requirements::CloudRequirementsLoadError; pub use cloud_requirements::CloudRequirementsLoader; pub use config_requirements::ConfigRequirements; pub use config_requirements::ConfigRequirementsToml; diff --git a/codex-rs/core/src/config/mod.rs b/codex-rs/core/src/config/mod.rs index aab692cfd..dd3d2b5d5 100644 --- a/codex-rs/core/src/config/mod.rs +++ b/codex-rs/core/src/config/mod.rs @@ -5921,12 +5921,12 @@ mcp_oauth_callback_url = "https://example.com/callback" let config = ConfigBuilder::default() .codex_home(codex_home.path().to_path_buf()) .cloud_requirements(CloudRequirementsLoader::new(async { - Some(crate::config_loader::ConfigRequirementsToml { + Ok(Some(crate::config_loader::ConfigRequirementsToml { allowed_sandbox_modes: Some(vec![ crate::config_loader::SandboxModeRequirement::ReadOnly, ]), ..Default::default() - }) + })) })) .build() .await?; @@ -5962,9 +5962,9 @@ mcp_oauth_callback_url = "https://example.com/callback" let config = ConfigBuilder::default() .codex_home(codex_home.path().to_path_buf()) .fallback_cwd(Some(codex_home.path().to_path_buf())) - .cloud_requirements(CloudRequirementsLoader::new( - async move { Some(requirements) }, - )) + .cloud_requirements(CloudRequirementsLoader::new(async move { + Ok(Some(requirements)) + })) .build() .await?; assert_eq!( @@ -5988,12 +5988,12 @@ mcp_oauth_callback_url = "https://example.com/callback" .codex_home(codex_home.path().to_path_buf()) .fallback_cwd(Some(codex_home.path().to_path_buf())) .cloud_requirements(CloudRequirementsLoader::new(async { - Some(crate::config_loader::ConfigRequirementsToml { + Ok(Some(crate::config_loader::ConfigRequirementsToml { allowed_web_search_modes: Some(vec![ crate::config_loader::WebSearchModeRequirement::Cached, ]), ..Default::default() - }) + })) })) .build() .await?; @@ -6029,10 +6029,10 @@ trust_level = "untrusted" .codex_home(codex_home.path().to_path_buf()) .fallback_cwd(Some(workspace.path().to_path_buf())) .cloud_requirements(CloudRequirementsLoader::new(async { - Some(crate::config_loader::ConfigRequirementsToml { + Ok(Some(crate::config_loader::ConfigRequirementsToml { allowed_approval_policies: Some(vec![AskForApproval::OnRequest]), ..Default::default() - }) + })) })) .build() .await?; @@ -6058,10 +6058,10 @@ trust_level = "untrusted" .codex_home(codex_home.path().to_path_buf()) .fallback_cwd(Some(codex_home.path().to_path_buf())) .cloud_requirements(CloudRequirementsLoader::new(async { - Some(crate::config_loader::ConfigRequirementsToml { + Ok(Some(crate::config_loader::ConfigRequirementsToml { allowed_approval_policies: Some(vec![AskForApproval::OnRequest]), ..Default::default() - }) + })) })) .build() .await?; diff --git a/codex-rs/core/src/config_loader/mod.rs b/codex-rs/core/src/config_loader/mod.rs index 70014c03b..5ce059a9f 100644 --- a/codex-rs/core/src/config_loader/mod.rs +++ b/codex-rs/core/src/config_loader/mod.rs @@ -25,6 +25,7 @@ use std::path::Path; use std::path::PathBuf; use toml::Value as TomlValue; +pub use codex_config::CloudRequirementsLoadError; pub use codex_config::CloudRequirementsLoader; pub use codex_config::ConfigError; pub use codex_config::ConfigLayerEntry; @@ -116,7 +117,7 @@ pub async fn load_config_layers_state( ) -> io::Result { let mut config_requirements_toml = ConfigRequirementsWithSources::default(); - if let Some(requirements) = cloud_requirements.get().await { + if let Some(requirements) = cloud_requirements.get().await.map_err(io::Error::other)? { config_requirements_toml .merge_unset_fields(RequirementSource::CloudRequirements, requirements); } diff --git a/codex-rs/core/src/config_loader/tests.rs b/codex-rs/core/src/config_loader/tests.rs index 72c4b2873..05d5fa4c2 100644 --- a/codex-rs/core/src/config_loader/tests.rs +++ b/codex-rs/core/src/config_loader/tests.rs @@ -5,6 +5,7 @@ use crate::config::ConfigOverrides; use crate::config::ConfigToml; use crate::config::ConstraintError; use crate::config::ProjectConfig; +use crate::config_loader::CloudRequirementsLoadError; use crate::config_loader::CloudRequirementsLoader; use crate::config_loader::ConfigLayerEntry; use crate::config_loader::ConfigLoadError; @@ -576,7 +577,7 @@ allowed_approval_policies = ["on-request"] ..LoaderOverrides::default() }, CloudRequirementsLoader::new(async { - Some(ConfigRequirementsToml { + Ok(Some(ConfigRequirementsToml { allowed_approval_policies: Some(vec![AskForApproval::Never]), allowed_sandbox_modes: None, allowed_web_search_modes: None, @@ -584,7 +585,7 @@ allowed_approval_policies = ["on-request"] rules: None, enforce_residency: None, network: None, - }) + })) }), ) .await?; @@ -671,7 +672,7 @@ async fn load_config_layers_includes_cloud_requirements() -> anyhow::Result<()> network: None, }; let expected = requirements.clone(); - let cloud_requirements = CloudRequirementsLoader::new(async move { Some(requirements) }); + let cloud_requirements = CloudRequirementsLoader::new(async move { Ok(Some(requirements)) }); let layers = load_config_layers_state( &codex_home, @@ -702,6 +703,31 @@ async fn load_config_layers_includes_cloud_requirements() -> anyhow::Result<()> Ok(()) } +#[tokio::test] +async fn load_config_layers_fails_when_cloud_requirements_loader_fails() -> anyhow::Result<()> { + let tmp = tempdir()?; + let codex_home = tmp.path().join("home"); + tokio::fs::create_dir_all(&codex_home).await?; + let cwd = AbsolutePathBuf::from_absolute_path(tmp.path())?; + + let err = load_config_layers_state( + &codex_home, + Some(cwd), + &[] as &[(String, TomlValue)], + LoaderOverrides::default(), + CloudRequirementsLoader::new(async { + Err(CloudRequirementsLoadError::new("cloud requirements failed")) + }), + ) + .await + .expect_err("cloud requirements failure should fail closed"); + + assert_eq!(err.kind(), std::io::ErrorKind::Other); + assert!(err.to_string().contains("cloud requirements failed")); + + Ok(()) +} + #[tokio::test] async fn project_layers_prefer_closest_cwd() -> std::io::Result<()> { let tmp = tempdir()?;