Make cloud_requirements fail close (#13063)

Make it fail-close only for CLI for now
Will extend this for app-server later
This commit is contained in:
alexsong-oai 2026-02-27 18:22:05 -08:00 committed by GitHub
parent e6032eb0b7
commit e2fef7a3d2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 158 additions and 65 deletions

View file

@ -4,9 +4,9 @@
//! from the local filesystem. It only applies to Business (aka Enterprise CBP) or Enterprise ChatGPT
//! customers.
//!
//! Today, fetching is best-effort: on error or timeout, Codex continues without cloud requirements.
//! We expect to tighten this so that Enterprise ChatGPT customers must successfully fetch these
//! requirements before Codex will run.
//! Fetching fails closed for eligible ChatGPT Business and Enterprise accounts. When cloud
//! requirements cannot be loaded for those accounts, Codex fails configuration loading rather than
//! continuing without them.
use async_trait::async_trait;
use base64::Engine;
@ -17,6 +17,7 @@ use chrono::Utc;
use codex_backend_client::Client as BackendClient;
use codex_core::AuthManager;
use codex_core::auth::CodexAuth;
use codex_core::config_loader::CloudRequirementsLoadError;
use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::config_loader::ConfigRequirementsToml;
use codex_core::util::backoff;
@ -220,16 +221,34 @@ impl CloudRequirementsService {
}
}
async fn fetch_with_timeout(&self) -> Option<ConfigRequirementsToml> {
async fn fetch_with_timeout(
&self,
) -> Result<Option<ConfigRequirementsToml>, CloudRequirementsLoadError> {
let _timer =
codex_otel::start_global_timer("codex.cloud_requirements.fetch.duration_ms", &[]);
let started_at = Instant::now();
let result = timeout(self.timeout, self.fetch())
.await
.inspect_err(|_| {
tracing::warn!("Timed out waiting for cloud requirements; continuing without them");
let message = format!(
"Timed out waiting for cloud requirements after {}s",
self.timeout.as_secs()
);
tracing::error!("{message}");
if let Some(metrics) = codex_otel::metrics::global() {
let _ = metrics.counter(
"codex.cloud_requirements.load_failure",
1,
&[("trigger", "startup")],
);
}
})
.ok()?;
.map_err(|_| {
CloudRequirementsLoadError::new(format!(
"timed out waiting for cloud requirements after {}s",
self.timeout.as_secs()
))
})??;
match result.as_ref() {
Some(requirements) => {
@ -247,18 +266,20 @@ impl CloudRequirementsService {
}
}
result
Ok(result)
}
async fn fetch(&self) -> Option<ConfigRequirementsToml> {
let auth = self.auth_manager.auth().await?;
async fn fetch(&self) -> Result<Option<ConfigRequirementsToml>, CloudRequirementsLoadError> {
let Some(auth) = self.auth_manager.auth().await else {
return Ok(None);
};
if !auth.is_chatgpt_auth()
|| !matches!(
auth.account_plan_type(),
Some(PlanType::Business | PlanType::Enterprise)
)
{
return None;
return Ok(None);
}
let token_data = auth.get_token_data().ok();
let chatgpt_user_id = token_data
@ -273,7 +294,7 @@ impl CloudRequirementsService {
path = %self.cache_path.display(),
"Using cached cloud requirements"
);
return signed_payload.requirements();
return Ok(signed_payload.requirements());
}
Err(cache_load_status) => {
self.log_cache_load_status(&cache_load_status);
@ -281,7 +302,15 @@ impl CloudRequirementsService {
}
self.fetch_with_retries(&auth, chatgpt_user_id, account_id)
.await?
.await
.ok_or_else(|| {
let message = "failed to load your workspace-managed config";
tracing::error!(
path = %self.cache_path.display(),
"{message}"
);
CloudRequirementsLoadError::new(message)
})
}
async fn fetch_with_retries(
@ -311,7 +340,7 @@ impl CloudRequirementsService {
Some(contents) => match parse_cloud_requirements(contents) {
Ok(requirements) => requirements,
Err(err) => {
tracing::warn!(error = %err, "Failed to parse cloud requirements");
tracing::error!(error = %err, "Failed to parse cloud requirements");
return None;
}
},
@ -342,7 +371,7 @@ impl CloudRequirementsService {
Ok(true) => {}
Ok(false) => break,
Err(_) => {
tracing::warn!(
tracing::error!(
"Timed out refreshing cloud requirements cache from remote; keeping existing cache"
);
}
@ -375,10 +404,17 @@ impl CloudRequirementsService {
.await
.is_none()
{
tracing::warn!(
tracing::error!(
path = %self.cache_path.display(),
"Failed to refresh cloud requirements cache from remote"
);
if let Some(metrics) = codex_otel::metrics::global() {
let _ = metrics.counter(
"codex.cloud_requirements.load_failure",
1,
&[("trigger", "refresh")],
);
}
}
true
}
@ -522,10 +558,10 @@ pub fn cloud_requirements_loader(
existing_task.abort();
}
CloudRequirementsLoader::new(async move {
task.await
.inspect_err(|err| tracing::warn!(error = %err, "Cloud requirements task failed"))
.ok()
.flatten()
task.await.map_err(|err| {
tracing::error!(error = %err, "Cloud requirements task failed");
CloudRequirementsLoadError::new(format!("cloud requirements load failed: {err}"))
})?
})
}
@ -692,7 +728,7 @@ mod tests {
CLOUD_REQUIREMENTS_TIMEOUT,
);
let result = service.fetch().await;
assert!(result.is_none());
assert_eq!(result, Ok(None));
}
#[tokio::test]
@ -705,7 +741,7 @@ mod tests {
CLOUD_REQUIREMENTS_TIMEOUT,
);
let result = service.fetch().await;
assert!(result.is_none());
assert_eq!(result, Ok(None));
}
#[tokio::test]
@ -721,7 +757,7 @@ mod tests {
);
assert_eq!(
service.fetch().await,
Some(ConfigRequirementsToml {
Ok(Some(ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::Never]),
allowed_sandbox_modes: None,
allowed_web_search_modes: None,
@ -729,7 +765,7 @@ mod tests {
rules: None,
enforce_residency: None,
network: None,
})
}))
);
}
@ -789,7 +825,11 @@ mod tests {
tokio::time::advance(CLOUD_REQUIREMENTS_TIMEOUT + Duration::from_millis(1)).await;
let result = handle.await.expect("cloud requirements task");
assert!(result.is_none());
let err = result.expect_err("cloud requirements timeout should fail closed");
assert!(
err.to_string()
.contains("timed out waiting for cloud requirements")
);
}
#[tokio::test(start_paused = true)]
@ -812,7 +852,7 @@ mod tests {
assert_eq!(
handle.await.expect("cloud requirements task"),
Some(ConfigRequirementsToml {
Ok(Some(ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::Never]),
allowed_sandbox_modes: None,
allowed_web_search_modes: None,
@ -820,7 +860,7 @@ mod tests {
rules: None,
enforce_residency: None,
network: None,
})
}))
);
assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 2);
}
@ -839,7 +879,7 @@ mod tests {
CLOUD_REQUIREMENTS_TIMEOUT,
);
assert!(service.fetch().await.is_none());
assert!(service.fetch().await.is_err());
assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1);
}
@ -868,7 +908,7 @@ mod tests {
assert_eq!(
service.fetch().await,
Some(ConfigRequirementsToml {
Ok(Some(ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::Never]),
allowed_sandbox_modes: None,
allowed_web_search_modes: None,
@ -876,7 +916,7 @@ mod tests {
rules: None,
enforce_residency: None,
network: None,
})
}))
);
assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 0);
}
@ -895,7 +935,7 @@ mod tests {
assert_eq!(
service.fetch().await,
Some(ConfigRequirementsToml {
Ok(Some(ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::Never]),
allowed_sandbox_modes: None,
allowed_web_search_modes: None,
@ -903,7 +943,7 @@ mod tests {
rules: None,
enforce_residency: None,
network: None,
})
}))
);
let path = codex_home.path().join(CLOUD_REQUIREMENTS_CACHE_FILENAME);
@ -942,7 +982,7 @@ mod tests {
assert_eq!(
service.fetch().await,
Some(ConfigRequirementsToml {
Ok(Some(ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::OnRequest]),
allowed_sandbox_modes: None,
allowed_web_search_modes: None,
@ -950,7 +990,7 @@ mod tests {
rules: None,
enforce_residency: None,
network: None,
})
}))
);
assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1);
}
@ -988,7 +1028,7 @@ mod tests {
assert_eq!(
service.fetch().await,
Some(ConfigRequirementsToml {
Ok(Some(ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::OnRequest]),
allowed_sandbox_modes: None,
allowed_web_search_modes: None,
@ -996,7 +1036,7 @@ mod tests {
rules: None,
enforce_residency: None,
network: None,
})
}))
);
assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1);
}
@ -1038,7 +1078,7 @@ mod tests {
assert_eq!(
service.fetch().await,
Some(ConfigRequirementsToml {
Ok(Some(ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::Never]),
allowed_sandbox_modes: None,
allowed_web_search_modes: None,
@ -1046,7 +1086,7 @@ mod tests {
rules: None,
enforce_residency: None,
network: None,
})
}))
);
assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1);
}
@ -1089,7 +1129,7 @@ mod tests {
assert_eq!(
service.fetch().await,
Some(ConfigRequirementsToml {
Ok(Some(ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::Never]),
allowed_sandbox_modes: None,
allowed_web_search_modes: None,
@ -1097,7 +1137,7 @@ mod tests {
rules: None,
enforce_residency: None,
network: None,
})
}))
);
assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1);
}
@ -1171,7 +1211,7 @@ mod tests {
CLOUD_REQUIREMENTS_TIMEOUT,
);
assert!(service.fetch().await.is_none());
assert_eq!(service.fetch().await, Ok(None));
assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1);
}
@ -1196,7 +1236,14 @@ mod tests {
tokio::time::advance(Duration::from_secs(5)).await;
tokio::task::yield_now().await;
assert!(handle.await.expect("cloud requirements task").is_none());
let err = handle
.await
.expect("cloud requirements task")
.expect_err("cloud requirements retry exhaustion should fail closed");
assert_eq!(
err.to_string(),
"failed to load your workspace-managed config"
);
assert_eq!(
fetcher.request_count.load(Ordering::SeqCst),
CLOUD_REQUIREMENTS_MAX_ATTEMPTS
@ -1221,7 +1268,7 @@ mod tests {
assert_eq!(
service.fetch().await,
Some(ConfigRequirementsToml {
Ok(Some(ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::Never]),
allowed_sandbox_modes: None,
allowed_web_search_modes: None,
@ -1229,10 +1276,10 @@ mod tests {
rules: None,
enforce_residency: None,
network: None,
})
}))
);
service.refresh_cache().await;
assert!(service.refresh_cache().await);
let path = codex_home.path().join(CLOUD_REQUIREMENTS_CACHE_FILENAME);
let cache_file: CloudRequirementsCacheFile =

View file

@ -4,24 +4,42 @@ use futures::future::FutureExt;
use futures::future::Shared;
use std::fmt;
use std::future::Future;
use thiserror::Error;
#[derive(Clone, Debug, Eq, Error, PartialEq)]
#[error("{message}")]
pub struct CloudRequirementsLoadError {
message: String,
}
impl CloudRequirementsLoadError {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
}
}
}
#[derive(Clone)]
pub struct CloudRequirementsLoader {
// TODO(gt): This should return a Result once we can fail-closed.
fut: Shared<BoxFuture<'static, Option<ConfigRequirementsToml>>>,
fut: Shared<
BoxFuture<'static, Result<Option<ConfigRequirementsToml>, CloudRequirementsLoadError>>,
>,
}
impl CloudRequirementsLoader {
pub fn new<F>(fut: F) -> Self
where
F: Future<Output = Option<ConfigRequirementsToml>> + Send + 'static,
F: Future<Output = Result<Option<ConfigRequirementsToml>, CloudRequirementsLoadError>>
+ Send
+ 'static,
{
Self {
fut: fut.boxed().shared(),
}
}
pub async fn get(&self) -> Option<ConfigRequirementsToml> {
pub async fn get(&self) -> Result<Option<ConfigRequirementsToml>, CloudRequirementsLoadError> {
self.fut.clone().await
}
}
@ -34,7 +52,7 @@ impl fmt::Debug for CloudRequirementsLoader {
impl Default for CloudRequirementsLoader {
fn default() -> Self {
Self::new(async { None })
Self::new(async { Ok(None) })
}
}
@ -52,7 +70,7 @@ mod tests {
let counter_clone = Arc::clone(&counter);
let loader = CloudRequirementsLoader::new(async move {
counter_clone.fetch_add(1, Ordering::SeqCst);
Some(ConfigRequirementsToml::default())
Ok(Some(ConfigRequirementsToml::default()))
});
let (first, second) = tokio::join!(loader.get(), loader.get());

View file

@ -10,6 +10,7 @@ mod state;
pub const CONFIG_TOML_FILE: &str = "config.toml";
pub use cloud_requirements::CloudRequirementsLoadError;
pub use cloud_requirements::CloudRequirementsLoader;
pub use config_requirements::ConfigRequirements;
pub use config_requirements::ConfigRequirementsToml;

View file

@ -5921,12 +5921,12 @@ mcp_oauth_callback_url = "https://example.com/callback"
let config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.cloud_requirements(CloudRequirementsLoader::new(async {
Some(crate::config_loader::ConfigRequirementsToml {
Ok(Some(crate::config_loader::ConfigRequirementsToml {
allowed_sandbox_modes: Some(vec![
crate::config_loader::SandboxModeRequirement::ReadOnly,
]),
..Default::default()
})
}))
}))
.build()
.await?;
@ -5962,9 +5962,9 @@ mcp_oauth_callback_url = "https://example.com/callback"
let config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(codex_home.path().to_path_buf()))
.cloud_requirements(CloudRequirementsLoader::new(
async move { Some(requirements) },
))
.cloud_requirements(CloudRequirementsLoader::new(async move {
Ok(Some(requirements))
}))
.build()
.await?;
assert_eq!(
@ -5988,12 +5988,12 @@ mcp_oauth_callback_url = "https://example.com/callback"
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(codex_home.path().to_path_buf()))
.cloud_requirements(CloudRequirementsLoader::new(async {
Some(crate::config_loader::ConfigRequirementsToml {
Ok(Some(crate::config_loader::ConfigRequirementsToml {
allowed_web_search_modes: Some(vec![
crate::config_loader::WebSearchModeRequirement::Cached,
]),
..Default::default()
})
}))
}))
.build()
.await?;
@ -6029,10 +6029,10 @@ trust_level = "untrusted"
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(workspace.path().to_path_buf()))
.cloud_requirements(CloudRequirementsLoader::new(async {
Some(crate::config_loader::ConfigRequirementsToml {
Ok(Some(crate::config_loader::ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::OnRequest]),
..Default::default()
})
}))
}))
.build()
.await?;
@ -6058,10 +6058,10 @@ trust_level = "untrusted"
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(codex_home.path().to_path_buf()))
.cloud_requirements(CloudRequirementsLoader::new(async {
Some(crate::config_loader::ConfigRequirementsToml {
Ok(Some(crate::config_loader::ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::OnRequest]),
..Default::default()
})
}))
}))
.build()
.await?;

View file

@ -25,6 +25,7 @@ use std::path::Path;
use std::path::PathBuf;
use toml::Value as TomlValue;
pub use codex_config::CloudRequirementsLoadError;
pub use codex_config::CloudRequirementsLoader;
pub use codex_config::ConfigError;
pub use codex_config::ConfigLayerEntry;
@ -116,7 +117,7 @@ pub async fn load_config_layers_state(
) -> io::Result<ConfigLayerStack> {
let mut config_requirements_toml = ConfigRequirementsWithSources::default();
if let Some(requirements) = cloud_requirements.get().await {
if let Some(requirements) = cloud_requirements.get().await.map_err(io::Error::other)? {
config_requirements_toml
.merge_unset_fields(RequirementSource::CloudRequirements, requirements);
}

View file

@ -5,6 +5,7 @@ use crate::config::ConfigOverrides;
use crate::config::ConfigToml;
use crate::config::ConstraintError;
use crate::config::ProjectConfig;
use crate::config_loader::CloudRequirementsLoadError;
use crate::config_loader::CloudRequirementsLoader;
use crate::config_loader::ConfigLayerEntry;
use crate::config_loader::ConfigLoadError;
@ -576,7 +577,7 @@ allowed_approval_policies = ["on-request"]
..LoaderOverrides::default()
},
CloudRequirementsLoader::new(async {
Some(ConfigRequirementsToml {
Ok(Some(ConfigRequirementsToml {
allowed_approval_policies: Some(vec![AskForApproval::Never]),
allowed_sandbox_modes: None,
allowed_web_search_modes: None,
@ -584,7 +585,7 @@ allowed_approval_policies = ["on-request"]
rules: None,
enforce_residency: None,
network: None,
})
}))
}),
)
.await?;
@ -671,7 +672,7 @@ async fn load_config_layers_includes_cloud_requirements() -> anyhow::Result<()>
network: None,
};
let expected = requirements.clone();
let cloud_requirements = CloudRequirementsLoader::new(async move { Some(requirements) });
let cloud_requirements = CloudRequirementsLoader::new(async move { Ok(Some(requirements)) });
let layers = load_config_layers_state(
&codex_home,
@ -702,6 +703,31 @@ async fn load_config_layers_includes_cloud_requirements() -> anyhow::Result<()>
Ok(())
}
#[tokio::test]
async fn load_config_layers_fails_when_cloud_requirements_loader_fails() -> anyhow::Result<()> {
let tmp = tempdir()?;
let codex_home = tmp.path().join("home");
tokio::fs::create_dir_all(&codex_home).await?;
let cwd = AbsolutePathBuf::from_absolute_path(tmp.path())?;
let err = load_config_layers_state(
&codex_home,
Some(cwd),
&[] as &[(String, TomlValue)],
LoaderOverrides::default(),
CloudRequirementsLoader::new(async {
Err(CloudRequirementsLoadError::new("cloud requirements failed"))
}),
)
.await
.expect_err("cloud requirements failure should fail closed");
assert_eq!(err.kind(), std::io::ErrorKind::Other);
assert!(err.to_string().contains("cloud requirements failed"));
Ok(())
}
#[tokio::test]
async fn project_layers_prefer_closest_cwd() -> std::io::Result<()> {
let tmp = tempdir()?;