Add granular metrics for cloud requirements load (#14108)
This commit is contained in:
parent
d751e68f44
commit
3d4628c9c4
1 changed files with 192 additions and 60 deletions
|
|
@ -45,7 +45,11 @@ const CLOUD_REQUIREMENTS_MAX_ATTEMPTS: usize = 5;
|
|||
const CLOUD_REQUIREMENTS_CACHE_FILENAME: &str = "cloud-requirements-cache.json";
|
||||
const CLOUD_REQUIREMENTS_CACHE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
|
||||
const CLOUD_REQUIREMENTS_CACHE_TTL: Duration = Duration::from_secs(30 * 60);
|
||||
const CLOUD_REQUIREMENTS_FETCH_ATTEMPT_METRIC: &str = "codex.cloud_requirements.fetch_attempt";
|
||||
const CLOUD_REQUIREMENTS_FETCH_FINAL_METRIC: &str = "codex.cloud_requirements.fetch_final";
|
||||
const CLOUD_REQUIREMENTS_LOAD_METRIC: &str = "codex.cloud_requirements.load";
|
||||
const CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE: &str = "failed to load your workspace-managed config";
|
||||
const CLOUD_REQUIREMENTS_AUTH_RECOVERY_FAILED_MESSAGE: &str = "Your authentication session could not be refreshed automatically. Please log out and sign in again.";
|
||||
const CLOUD_REQUIREMENTS_CACHE_WRITE_HMAC_KEY: &[u8] =
|
||||
b"codex-cloud-requirements-cache-v3-064f8542-75b4-494c-a294-97d3ce597271";
|
||||
const CLOUD_REQUIREMENTS_CACHE_READ_HMAC_KEYS: &[&[u8]] =
|
||||
|
|
@ -59,15 +63,27 @@ fn refresher_task_slot() -> &'static Mutex<Option<JoinHandle<()>>> {
|
|||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum FetchCloudRequirementsStatus {
|
||||
enum RetryableFailureKind {
|
||||
BackendClientInit,
|
||||
Request,
|
||||
Request { status_code: Option<u16> },
|
||||
}
|
||||
|
||||
impl RetryableFailureKind {
|
||||
fn status_code(self) -> Option<u16> {
|
||||
match self {
|
||||
Self::BackendClientInit => None,
|
||||
Self::Request { status_code } => status_code,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
enum FetchCloudRequirementsError {
|
||||
Retryable(FetchCloudRequirementsStatus),
|
||||
Unauthorized(CloudRequirementsLoadError),
|
||||
enum FetchAttemptError {
|
||||
Retryable(RetryableFailureKind),
|
||||
Unauthorized {
|
||||
status_code: Option<u16>,
|
||||
error: CloudRequirementsLoadError,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, Error, PartialEq)]
|
||||
|
|
@ -171,7 +187,7 @@ trait RequirementsFetcher: Send + Sync {
|
|||
async fn fetch_requirements(
|
||||
&self,
|
||||
auth: &CodexAuth,
|
||||
) -> Result<Option<String>, FetchCloudRequirementsError>;
|
||||
) -> Result<Option<String>, FetchAttemptError>;
|
||||
}
|
||||
|
||||
struct BackendRequirementsFetcher {
|
||||
|
|
@ -189,7 +205,7 @@ impl RequirementsFetcher for BackendRequirementsFetcher {
|
|||
async fn fetch_requirements(
|
||||
&self,
|
||||
auth: &CodexAuth,
|
||||
) -> Result<Option<String>, FetchCloudRequirementsError> {
|
||||
) -> Result<Option<String>, FetchAttemptError> {
|
||||
let client = BackendClient::from_auth(self.base_url.clone(), auth)
|
||||
.inspect_err(|err| {
|
||||
tracing::warn!(
|
||||
|
|
@ -197,23 +213,21 @@ impl RequirementsFetcher for BackendRequirementsFetcher {
|
|||
"Failed to construct backend client for cloud requirements"
|
||||
);
|
||||
})
|
||||
.map_err(|_| {
|
||||
FetchCloudRequirementsError::Retryable(
|
||||
FetchCloudRequirementsStatus::BackendClientInit,
|
||||
)
|
||||
})?;
|
||||
.map_err(|_| FetchAttemptError::Retryable(RetryableFailureKind::BackendClientInit))?;
|
||||
|
||||
let response = client
|
||||
.get_config_requirements_file()
|
||||
.await
|
||||
.inspect_err(|err| tracing::warn!(error = %err, "Failed to fetch cloud requirements"))
|
||||
.map_err(|err| {
|
||||
let status_code = err.status().map(|status| status.as_u16());
|
||||
if err.is_unauthorized() {
|
||||
FetchCloudRequirementsError::Unauthorized(CloudRequirementsLoadError::new(
|
||||
err.to_string(),
|
||||
))
|
||||
FetchAttemptError::Unauthorized {
|
||||
status_code,
|
||||
error: CloudRequirementsLoadError::new(err.to_string()),
|
||||
}
|
||||
} else {
|
||||
FetchCloudRequirementsError::Retryable(FetchCloudRequirementsStatus::Request)
|
||||
FetchAttemptError::Retryable(RetryableFailureKind::Request { status_code })
|
||||
}
|
||||
})?;
|
||||
|
||||
|
|
@ -257,7 +271,7 @@ impl CloudRequirementsService {
|
|||
let _timer =
|
||||
codex_otel::start_global_timer("codex.cloud_requirements.fetch.duration_ms", &[]);
|
||||
let started_at = Instant::now();
|
||||
let result = timeout(self.timeout, self.fetch())
|
||||
let fetch_result = timeout(self.timeout, self.fetch())
|
||||
.await
|
||||
.inspect_err(|_| {
|
||||
let message = format!(
|
||||
|
|
@ -265,20 +279,22 @@ impl CloudRequirementsService {
|
|||
self.timeout.as_secs()
|
||||
);
|
||||
tracing::error!("{message}");
|
||||
if let Some(metrics) = codex_otel::metrics::global() {
|
||||
let _ = metrics.counter(
|
||||
"codex.cloud_requirements.load_failure",
|
||||
1,
|
||||
&[("trigger", "startup")],
|
||||
);
|
||||
}
|
||||
emit_load_metric("startup", "error");
|
||||
})
|
||||
.map_err(|_| {
|
||||
CloudRequirementsLoadError::new(format!(
|
||||
"timed out waiting for cloud requirements after {}s",
|
||||
self.timeout.as_secs()
|
||||
))
|
||||
})??;
|
||||
})?;
|
||||
|
||||
let result = match fetch_result {
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
emit_load_metric("startup", "error");
|
||||
return Err(err);
|
||||
}
|
||||
};
|
||||
|
||||
match result.as_ref() {
|
||||
Some(requirements) => {
|
||||
|
|
@ -287,12 +303,14 @@ impl CloudRequirementsService {
|
|||
requirements = ?requirements,
|
||||
"Cloud requirements load completed"
|
||||
);
|
||||
emit_load_metric("startup", "success");
|
||||
}
|
||||
None => {
|
||||
tracing::info!(
|
||||
elapsed_ms = started_at.elapsed().as_millis(),
|
||||
"Cloud requirements load completed (none)"
|
||||
);
|
||||
emit_load_metric("startup", "success");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -329,20 +347,28 @@ impl CloudRequirementsService {
|
|||
}
|
||||
}
|
||||
|
||||
self.fetch_with_retries(auth).await
|
||||
self.fetch_with_retries(auth, "startup").await
|
||||
}
|
||||
|
||||
async fn fetch_with_retries(
|
||||
&self,
|
||||
mut auth: CodexAuth,
|
||||
trigger: &'static str,
|
||||
) -> Result<Option<ConfigRequirementsToml>, CloudRequirementsLoadError> {
|
||||
let mut attempt = 1;
|
||||
let mut last_status_code: Option<u16> = None;
|
||||
let mut auth_recovery = self.auth_manager.unauthorized_recovery();
|
||||
|
||||
while attempt <= CLOUD_REQUIREMENTS_MAX_ATTEMPTS {
|
||||
let contents = match self.fetcher.fetch_requirements(&auth).await {
|
||||
Ok(contents) => contents,
|
||||
Err(FetchCloudRequirementsError::Retryable(status)) => {
|
||||
Ok(contents) => {
|
||||
emit_fetch_attempt_metric(trigger, attempt, "success", None);
|
||||
contents
|
||||
}
|
||||
Err(FetchAttemptError::Retryable(status)) => {
|
||||
let status_code = status.status_code();
|
||||
last_status_code = status_code;
|
||||
emit_fetch_attempt_metric(trigger, attempt, "error", status_code);
|
||||
if attempt < CLOUD_REQUIREMENTS_MAX_ATTEMPTS {
|
||||
tracing::warn!(
|
||||
status = ?status,
|
||||
|
|
@ -355,7 +381,9 @@ impl CloudRequirementsService {
|
|||
attempt += 1;
|
||||
continue;
|
||||
}
|
||||
Err(FetchCloudRequirementsError::Unauthorized(err)) => {
|
||||
Err(FetchAttemptError::Unauthorized { status_code, error }) => {
|
||||
last_status_code = status_code;
|
||||
emit_fetch_attempt_metric(trigger, attempt, "unauthorized", status_code);
|
||||
if auth_recovery.has_next() {
|
||||
tracing::warn!(
|
||||
attempt,
|
||||
|
|
@ -368,8 +396,15 @@ impl CloudRequirementsService {
|
|||
tracing::error!(
|
||||
"Auth recovery succeeded but no auth is available for cloud requirements"
|
||||
);
|
||||
emit_fetch_final_metric(
|
||||
trigger,
|
||||
"error",
|
||||
"auth_recovery_missing_auth",
|
||||
attempt,
|
||||
status_code,
|
||||
);
|
||||
return Err(CloudRequirementsLoadError::new(
|
||||
CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE,
|
||||
CLOUD_REQUIREMENTS_AUTH_RECOVERY_FAILED_MESSAGE,
|
||||
));
|
||||
};
|
||||
auth = refreshed_auth;
|
||||
|
|
@ -380,6 +415,13 @@ impl CloudRequirementsService {
|
|||
error = %failed,
|
||||
"Failed to recover from unauthorized cloud requirements request"
|
||||
);
|
||||
emit_fetch_final_metric(
|
||||
trigger,
|
||||
"error",
|
||||
"auth_recovery_unrecoverable",
|
||||
attempt,
|
||||
status_code,
|
||||
);
|
||||
return Err(CloudRequirementsLoadError::new(failed.message));
|
||||
}
|
||||
Err(RefreshTokenError::Transient(recovery_err)) => {
|
||||
|
|
@ -399,11 +441,18 @@ impl CloudRequirementsService {
|
|||
}
|
||||
|
||||
tracing::warn!(
|
||||
error = %err,
|
||||
error = %error,
|
||||
"Cloud requirements request was unauthorized and no auth recovery is available"
|
||||
);
|
||||
emit_fetch_final_metric(
|
||||
trigger,
|
||||
"error",
|
||||
"auth_recovery_unavailable",
|
||||
attempt,
|
||||
status_code,
|
||||
);
|
||||
return Err(CloudRequirementsLoadError::new(
|
||||
CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE,
|
||||
CLOUD_REQUIREMENTS_AUTH_RECOVERY_FAILED_MESSAGE,
|
||||
));
|
||||
}
|
||||
};
|
||||
|
|
@ -413,6 +462,13 @@ impl CloudRequirementsService {
|
|||
Ok(requirements) => requirements,
|
||||
Err(err) => {
|
||||
tracing::error!(error = %err, "Failed to parse cloud requirements");
|
||||
emit_fetch_final_metric(
|
||||
trigger,
|
||||
"error",
|
||||
"parse_error",
|
||||
attempt,
|
||||
last_status_code,
|
||||
);
|
||||
return Err(CloudRequirementsLoadError::new(
|
||||
CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE,
|
||||
));
|
||||
|
|
@ -426,9 +482,17 @@ impl CloudRequirementsService {
|
|||
tracing::warn!(error = %err, "Failed to write cloud requirements cache");
|
||||
}
|
||||
|
||||
emit_fetch_final_metric(trigger, "success", "none", attempt, None);
|
||||
return Ok(requirements);
|
||||
}
|
||||
|
||||
emit_fetch_final_metric(
|
||||
trigger,
|
||||
"error",
|
||||
"request_retry_exhausted",
|
||||
CLOUD_REQUIREMENTS_MAX_ATTEMPTS,
|
||||
last_status_code,
|
||||
);
|
||||
tracing::error!(
|
||||
path = %self.cache_path.display(),
|
||||
"{CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE}"
|
||||
|
|
@ -448,6 +512,7 @@ impl CloudRequirementsService {
|
|||
tracing::error!(
|
||||
"Timed out refreshing cloud requirements cache from remote; keeping existing cache"
|
||||
);
|
||||
emit_load_metric("refresh", "error");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -466,18 +531,15 @@ impl CloudRequirementsService {
|
|||
return false;
|
||||
}
|
||||
|
||||
if let Err(err) = self.fetch_with_retries(auth).await {
|
||||
tracing::error!(
|
||||
path = %self.cache_path.display(),
|
||||
error = %err,
|
||||
"Failed to refresh cloud requirements cache from remote"
|
||||
);
|
||||
if let Some(metrics) = codex_otel::metrics::global() {
|
||||
let _ = metrics.counter(
|
||||
"codex.cloud_requirements.load_failure",
|
||||
1,
|
||||
&[("trigger", "refresh")],
|
||||
match self.fetch_with_retries(auth, "refresh").await {
|
||||
Ok(_) => emit_load_metric("refresh", "success"),
|
||||
Err(err) => {
|
||||
tracing::error!(
|
||||
path = %self.cache_path.display(),
|
||||
error = %err,
|
||||
"Failed to refresh cloud requirements cache from remote"
|
||||
);
|
||||
emit_load_metric("refresh", "error");
|
||||
}
|
||||
}
|
||||
true
|
||||
|
|
@ -644,6 +706,72 @@ fn parse_cloud_requirements(
|
|||
}
|
||||
}
|
||||
|
||||
fn emit_fetch_attempt_metric(
|
||||
trigger: &str,
|
||||
attempt: usize,
|
||||
outcome: &str,
|
||||
status_code: Option<u16>,
|
||||
) {
|
||||
let attempt_tag = attempt.to_string();
|
||||
let status_code_tag = status_code_tag(status_code);
|
||||
emit_metric(
|
||||
CLOUD_REQUIREMENTS_FETCH_ATTEMPT_METRIC,
|
||||
vec![
|
||||
("trigger", trigger.to_string()),
|
||||
("attempt", attempt_tag),
|
||||
("outcome", outcome.to_string()),
|
||||
("status_code", status_code_tag),
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
fn emit_fetch_final_metric(
|
||||
trigger: &str,
|
||||
outcome: &str,
|
||||
reason: &str,
|
||||
attempt_count: usize,
|
||||
status_code: Option<u16>,
|
||||
) {
|
||||
let attempt_count_tag = attempt_count.to_string();
|
||||
let status_code_tag = status_code_tag(status_code);
|
||||
emit_metric(
|
||||
CLOUD_REQUIREMENTS_FETCH_FINAL_METRIC,
|
||||
vec![
|
||||
("trigger", trigger.to_string()),
|
||||
("outcome", outcome.to_string()),
|
||||
("reason", reason.to_string()),
|
||||
("attempt_count", attempt_count_tag),
|
||||
("status_code", status_code_tag),
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
fn emit_load_metric(trigger: &str, outcome: &str) {
|
||||
emit_metric(
|
||||
CLOUD_REQUIREMENTS_LOAD_METRIC,
|
||||
vec![
|
||||
("trigger", trigger.to_string()),
|
||||
("outcome", outcome.to_string()),
|
||||
],
|
||||
);
|
||||
}
|
||||
|
||||
fn status_code_tag(status_code: Option<u16>) -> String {
|
||||
status_code
|
||||
.map(|status_code| status_code.to_string())
|
||||
.unwrap_or_else(|| "none".to_string())
|
||||
}
|
||||
|
||||
fn emit_metric(metric_name: &str, tags: Vec<(&str, String)>) {
|
||||
if let Some(metrics) = codex_otel::metrics::global() {
|
||||
let tag_refs = tags
|
||||
.iter()
|
||||
.map(|(key, value)| (*key, value.as_str()))
|
||||
.collect::<Vec<_>>();
|
||||
let _ = metrics.counter(metric_name, 1, &tag_refs);
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
@ -803,8 +931,8 @@ mod tests {
|
|||
contents.and_then(|contents| parse_cloud_requirements(contents).ok().flatten())
|
||||
}
|
||||
|
||||
fn request_error() -> FetchCloudRequirementsError {
|
||||
FetchCloudRequirementsError::Retryable(FetchCloudRequirementsStatus::Request)
|
||||
fn request_error() -> FetchAttemptError {
|
||||
FetchAttemptError::Retryable(RetryableFailureKind::Request { status_code: None })
|
||||
}
|
||||
|
||||
struct StaticFetcher {
|
||||
|
|
@ -816,7 +944,7 @@ mod tests {
|
|||
async fn fetch_requirements(
|
||||
&self,
|
||||
_auth: &CodexAuth,
|
||||
) -> Result<Option<String>, FetchCloudRequirementsError> {
|
||||
) -> Result<Option<String>, FetchAttemptError> {
|
||||
Ok(self.contents.clone())
|
||||
}
|
||||
}
|
||||
|
|
@ -828,20 +956,19 @@ mod tests {
|
|||
async fn fetch_requirements(
|
||||
&self,
|
||||
_auth: &CodexAuth,
|
||||
) -> Result<Option<String>, FetchCloudRequirementsError> {
|
||||
) -> Result<Option<String>, FetchAttemptError> {
|
||||
pending::<()>().await;
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
struct SequenceFetcher {
|
||||
responses:
|
||||
tokio::sync::Mutex<VecDeque<Result<Option<String>, FetchCloudRequirementsError>>>,
|
||||
responses: tokio::sync::Mutex<VecDeque<Result<Option<String>, FetchAttemptError>>>,
|
||||
request_count: AtomicUsize,
|
||||
}
|
||||
|
||||
impl SequenceFetcher {
|
||||
fn new(responses: Vec<Result<Option<String>, FetchCloudRequirementsError>>) -> Self {
|
||||
fn new(responses: Vec<Result<Option<String>, FetchAttemptError>>) -> Self {
|
||||
Self {
|
||||
responses: tokio::sync::Mutex::new(VecDeque::from(responses)),
|
||||
request_count: AtomicUsize::new(0),
|
||||
|
|
@ -854,7 +981,7 @@ mod tests {
|
|||
async fn fetch_requirements(
|
||||
&self,
|
||||
_auth: &CodexAuth,
|
||||
) -> Result<Option<String>, FetchCloudRequirementsError> {
|
||||
) -> Result<Option<String>, FetchAttemptError> {
|
||||
self.request_count.fetch_add(1, Ordering::SeqCst);
|
||||
let mut responses = self.responses.lock().await;
|
||||
responses.pop_front().unwrap_or(Ok(None))
|
||||
|
|
@ -872,7 +999,7 @@ mod tests {
|
|||
async fn fetch_requirements(
|
||||
&self,
|
||||
auth: &CodexAuth,
|
||||
) -> Result<Option<String>, FetchCloudRequirementsError> {
|
||||
) -> Result<Option<String>, FetchAttemptError> {
|
||||
self.request_count.fetch_add(1, Ordering::SeqCst);
|
||||
if matches!(
|
||||
auth.get_token().as_deref(),
|
||||
|
|
@ -880,9 +1007,10 @@ mod tests {
|
|||
) {
|
||||
Ok(Some(self.contents.clone()))
|
||||
} else {
|
||||
Err(FetchCloudRequirementsError::Unauthorized(
|
||||
CloudRequirementsLoadError::new("GET /config/requirements failed: 401"),
|
||||
))
|
||||
Err(FetchAttemptError::Unauthorized {
|
||||
status_code: Some(401),
|
||||
error: CloudRequirementsLoadError::new("GET /config/requirements failed: 401"),
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -897,11 +1025,12 @@ mod tests {
|
|||
async fn fetch_requirements(
|
||||
&self,
|
||||
_auth: &CodexAuth,
|
||||
) -> Result<Option<String>, FetchCloudRequirementsError> {
|
||||
) -> Result<Option<String>, FetchAttemptError> {
|
||||
self.request_count.fetch_add(1, Ordering::SeqCst);
|
||||
Err(FetchCloudRequirementsError::Unauthorized(
|
||||
CloudRequirementsLoadError::new(self.message.clone()),
|
||||
))
|
||||
Err(FetchAttemptError::Unauthorized {
|
||||
status_code: Some(401),
|
||||
error: CloudRequirementsLoadError::new(self.message.clone()),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1252,7 +1381,10 @@ mod tests {
|
|||
.fetch()
|
||||
.await
|
||||
.expect_err("cloud requirements should fail closed");
|
||||
assert_eq!(err.to_string(), CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE);
|
||||
assert_eq!(
|
||||
err.to_string(),
|
||||
CLOUD_REQUIREMENTS_AUTH_RECOVERY_FAILED_MESSAGE
|
||||
);
|
||||
assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue