diff --git a/codex-rs/cloud-requirements/src/lib.rs b/codex-rs/cloud-requirements/src/lib.rs index b71a1af51ce..94f78edcc55 100644 --- a/codex-rs/cloud-requirements/src/lib.rs +++ b/codex-rs/cloud-requirements/src/lib.rs @@ -45,7 +45,11 @@ const CLOUD_REQUIREMENTS_MAX_ATTEMPTS: usize = 5; const CLOUD_REQUIREMENTS_CACHE_FILENAME: &str = "cloud-requirements-cache.json"; const CLOUD_REQUIREMENTS_CACHE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60); const CLOUD_REQUIREMENTS_CACHE_TTL: Duration = Duration::from_secs(30 * 60); +const CLOUD_REQUIREMENTS_FETCH_ATTEMPT_METRIC: &str = "codex.cloud_requirements.fetch_attempt"; +const CLOUD_REQUIREMENTS_FETCH_FINAL_METRIC: &str = "codex.cloud_requirements.fetch_final"; +const CLOUD_REQUIREMENTS_LOAD_METRIC: &str = "codex.cloud_requirements.load"; const CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE: &str = "failed to load your workspace-managed config"; +const CLOUD_REQUIREMENTS_AUTH_RECOVERY_FAILED_MESSAGE: &str = "Your authentication session could not be refreshed automatically. Please log out and sign in again."; const CLOUD_REQUIREMENTS_CACHE_WRITE_HMAC_KEY: &[u8] = b"codex-cloud-requirements-cache-v3-064f8542-75b4-494c-a294-97d3ce597271"; const CLOUD_REQUIREMENTS_CACHE_READ_HMAC_KEYS: &[&[u8]] = @@ -59,15 +63,27 @@ fn refresher_task_slot() -> &'static Mutex>> { } #[derive(Clone, Copy, Debug, Eq, PartialEq)] -enum FetchCloudRequirementsStatus { +enum RetryableFailureKind { BackendClientInit, - Request, + Request { status_code: Option }, +} + +impl RetryableFailureKind { + fn status_code(self) -> Option { + match self { + Self::BackendClientInit => None, + Self::Request { status_code } => status_code, + } + } } #[derive(Clone, Debug, Eq, PartialEq)] -enum FetchCloudRequirementsError { - Retryable(FetchCloudRequirementsStatus), - Unauthorized(CloudRequirementsLoadError), +enum FetchAttemptError { + Retryable(RetryableFailureKind), + Unauthorized { + status_code: Option, + error: CloudRequirementsLoadError, + }, } #[derive(Clone, Debug, Eq, Error, PartialEq)] @@ -171,7 +187,7 @@ trait RequirementsFetcher: Send + Sync { async fn fetch_requirements( &self, auth: &CodexAuth, - ) -> Result, FetchCloudRequirementsError>; + ) -> Result, FetchAttemptError>; } struct BackendRequirementsFetcher { @@ -189,7 +205,7 @@ impl RequirementsFetcher for BackendRequirementsFetcher { async fn fetch_requirements( &self, auth: &CodexAuth, - ) -> Result, FetchCloudRequirementsError> { + ) -> Result, FetchAttemptError> { let client = BackendClient::from_auth(self.base_url.clone(), auth) .inspect_err(|err| { tracing::warn!( @@ -197,23 +213,21 @@ impl RequirementsFetcher for BackendRequirementsFetcher { "Failed to construct backend client for cloud requirements" ); }) - .map_err(|_| { - FetchCloudRequirementsError::Retryable( - FetchCloudRequirementsStatus::BackendClientInit, - ) - })?; + .map_err(|_| FetchAttemptError::Retryable(RetryableFailureKind::BackendClientInit))?; let response = client .get_config_requirements_file() .await .inspect_err(|err| tracing::warn!(error = %err, "Failed to fetch cloud requirements")) .map_err(|err| { + let status_code = err.status().map(|status| status.as_u16()); if err.is_unauthorized() { - FetchCloudRequirementsError::Unauthorized(CloudRequirementsLoadError::new( - err.to_string(), - )) + FetchAttemptError::Unauthorized { + status_code, + error: CloudRequirementsLoadError::new(err.to_string()), + } } else { - FetchCloudRequirementsError::Retryable(FetchCloudRequirementsStatus::Request) + FetchAttemptError::Retryable(RetryableFailureKind::Request { status_code }) } })?; @@ -257,7 +271,7 @@ impl CloudRequirementsService { let _timer = codex_otel::start_global_timer("codex.cloud_requirements.fetch.duration_ms", &[]); let started_at = Instant::now(); - let result = timeout(self.timeout, self.fetch()) + let fetch_result = timeout(self.timeout, self.fetch()) .await .inspect_err(|_| { let message = format!( @@ -265,20 +279,22 @@ impl CloudRequirementsService { self.timeout.as_secs() ); tracing::error!("{message}"); - if let Some(metrics) = codex_otel::metrics::global() { - let _ = metrics.counter( - "codex.cloud_requirements.load_failure", - 1, - &[("trigger", "startup")], - ); - } + emit_load_metric("startup", "error"); }) .map_err(|_| { CloudRequirementsLoadError::new(format!( "timed out waiting for cloud requirements after {}s", self.timeout.as_secs() )) - })??; + })?; + + let result = match fetch_result { + Ok(result) => result, + Err(err) => { + emit_load_metric("startup", "error"); + return Err(err); + } + }; match result.as_ref() { Some(requirements) => { @@ -287,12 +303,14 @@ impl CloudRequirementsService { requirements = ?requirements, "Cloud requirements load completed" ); + emit_load_metric("startup", "success"); } None => { tracing::info!( elapsed_ms = started_at.elapsed().as_millis(), "Cloud requirements load completed (none)" ); + emit_load_metric("startup", "success"); } } @@ -329,20 +347,28 @@ impl CloudRequirementsService { } } - self.fetch_with_retries(auth).await + self.fetch_with_retries(auth, "startup").await } async fn fetch_with_retries( &self, mut auth: CodexAuth, + trigger: &'static str, ) -> Result, CloudRequirementsLoadError> { let mut attempt = 1; + let mut last_status_code: Option = None; let mut auth_recovery = self.auth_manager.unauthorized_recovery(); while attempt <= CLOUD_REQUIREMENTS_MAX_ATTEMPTS { let contents = match self.fetcher.fetch_requirements(&auth).await { - Ok(contents) => contents, - Err(FetchCloudRequirementsError::Retryable(status)) => { + Ok(contents) => { + emit_fetch_attempt_metric(trigger, attempt, "success", None); + contents + } + Err(FetchAttemptError::Retryable(status)) => { + let status_code = status.status_code(); + last_status_code = status_code; + emit_fetch_attempt_metric(trigger, attempt, "error", status_code); if attempt < CLOUD_REQUIREMENTS_MAX_ATTEMPTS { tracing::warn!( status = ?status, @@ -355,7 +381,9 @@ impl CloudRequirementsService { attempt += 1; continue; } - Err(FetchCloudRequirementsError::Unauthorized(err)) => { + Err(FetchAttemptError::Unauthorized { status_code, error }) => { + last_status_code = status_code; + emit_fetch_attempt_metric(trigger, attempt, "unauthorized", status_code); if auth_recovery.has_next() { tracing::warn!( attempt, @@ -368,8 +396,15 @@ impl CloudRequirementsService { tracing::error!( "Auth recovery succeeded but no auth is available for cloud requirements" ); + emit_fetch_final_metric( + trigger, + "error", + "auth_recovery_missing_auth", + attempt, + status_code, + ); return Err(CloudRequirementsLoadError::new( - CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE, + CLOUD_REQUIREMENTS_AUTH_RECOVERY_FAILED_MESSAGE, )); }; auth = refreshed_auth; @@ -380,6 +415,13 @@ impl CloudRequirementsService { error = %failed, "Failed to recover from unauthorized cloud requirements request" ); + emit_fetch_final_metric( + trigger, + "error", + "auth_recovery_unrecoverable", + attempt, + status_code, + ); return Err(CloudRequirementsLoadError::new(failed.message)); } Err(RefreshTokenError::Transient(recovery_err)) => { @@ -399,11 +441,18 @@ impl CloudRequirementsService { } tracing::warn!( - error = %err, + error = %error, "Cloud requirements request was unauthorized and no auth recovery is available" ); + emit_fetch_final_metric( + trigger, + "error", + "auth_recovery_unavailable", + attempt, + status_code, + ); return Err(CloudRequirementsLoadError::new( - CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE, + CLOUD_REQUIREMENTS_AUTH_RECOVERY_FAILED_MESSAGE, )); } }; @@ -413,6 +462,13 @@ impl CloudRequirementsService { Ok(requirements) => requirements, Err(err) => { tracing::error!(error = %err, "Failed to parse cloud requirements"); + emit_fetch_final_metric( + trigger, + "error", + "parse_error", + attempt, + last_status_code, + ); return Err(CloudRequirementsLoadError::new( CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE, )); @@ -426,9 +482,17 @@ impl CloudRequirementsService { tracing::warn!(error = %err, "Failed to write cloud requirements cache"); } + emit_fetch_final_metric(trigger, "success", "none", attempt, None); return Ok(requirements); } + emit_fetch_final_metric( + trigger, + "error", + "request_retry_exhausted", + CLOUD_REQUIREMENTS_MAX_ATTEMPTS, + last_status_code, + ); tracing::error!( path = %self.cache_path.display(), "{CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE}" @@ -448,6 +512,7 @@ impl CloudRequirementsService { tracing::error!( "Timed out refreshing cloud requirements cache from remote; keeping existing cache" ); + emit_load_metric("refresh", "error"); } } } @@ -466,18 +531,15 @@ impl CloudRequirementsService { return false; } - if let Err(err) = self.fetch_with_retries(auth).await { - tracing::error!( - path = %self.cache_path.display(), - error = %err, - "Failed to refresh cloud requirements cache from remote" - ); - if let Some(metrics) = codex_otel::metrics::global() { - let _ = metrics.counter( - "codex.cloud_requirements.load_failure", - 1, - &[("trigger", "refresh")], + match self.fetch_with_retries(auth, "refresh").await { + Ok(_) => emit_load_metric("refresh", "success"), + Err(err) => { + tracing::error!( + path = %self.cache_path.display(), + error = %err, + "Failed to refresh cloud requirements cache from remote" ); + emit_load_metric("refresh", "error"); } } true @@ -644,6 +706,72 @@ fn parse_cloud_requirements( } } +fn emit_fetch_attempt_metric( + trigger: &str, + attempt: usize, + outcome: &str, + status_code: Option, +) { + let attempt_tag = attempt.to_string(); + let status_code_tag = status_code_tag(status_code); + emit_metric( + CLOUD_REQUIREMENTS_FETCH_ATTEMPT_METRIC, + vec![ + ("trigger", trigger.to_string()), + ("attempt", attempt_tag), + ("outcome", outcome.to_string()), + ("status_code", status_code_tag), + ], + ); +} + +fn emit_fetch_final_metric( + trigger: &str, + outcome: &str, + reason: &str, + attempt_count: usize, + status_code: Option, +) { + let attempt_count_tag = attempt_count.to_string(); + let status_code_tag = status_code_tag(status_code); + emit_metric( + CLOUD_REQUIREMENTS_FETCH_FINAL_METRIC, + vec![ + ("trigger", trigger.to_string()), + ("outcome", outcome.to_string()), + ("reason", reason.to_string()), + ("attempt_count", attempt_count_tag), + ("status_code", status_code_tag), + ], + ); +} + +fn emit_load_metric(trigger: &str, outcome: &str) { + emit_metric( + CLOUD_REQUIREMENTS_LOAD_METRIC, + vec![ + ("trigger", trigger.to_string()), + ("outcome", outcome.to_string()), + ], + ); +} + +fn status_code_tag(status_code: Option) -> String { + status_code + .map(|status_code| status_code.to_string()) + .unwrap_or_else(|| "none".to_string()) +} + +fn emit_metric(metric_name: &str, tags: Vec<(&str, String)>) { + if let Some(metrics) = codex_otel::metrics::global() { + let tag_refs = tags + .iter() + .map(|(key, value)| (*key, value.as_str())) + .collect::>(); + let _ = metrics.counter(metric_name, 1, &tag_refs); + } +} + #[cfg(test)] mod tests { use super::*; @@ -803,8 +931,8 @@ mod tests { contents.and_then(|contents| parse_cloud_requirements(contents).ok().flatten()) } - fn request_error() -> FetchCloudRequirementsError { - FetchCloudRequirementsError::Retryable(FetchCloudRequirementsStatus::Request) + fn request_error() -> FetchAttemptError { + FetchAttemptError::Retryable(RetryableFailureKind::Request { status_code: None }) } struct StaticFetcher { @@ -816,7 +944,7 @@ mod tests { async fn fetch_requirements( &self, _auth: &CodexAuth, - ) -> Result, FetchCloudRequirementsError> { + ) -> Result, FetchAttemptError> { Ok(self.contents.clone()) } } @@ -828,20 +956,19 @@ mod tests { async fn fetch_requirements( &self, _auth: &CodexAuth, - ) -> Result, FetchCloudRequirementsError> { + ) -> Result, FetchAttemptError> { pending::<()>().await; Ok(None) } } struct SequenceFetcher { - responses: - tokio::sync::Mutex, FetchCloudRequirementsError>>>, + responses: tokio::sync::Mutex, FetchAttemptError>>>, request_count: AtomicUsize, } impl SequenceFetcher { - fn new(responses: Vec, FetchCloudRequirementsError>>) -> Self { + fn new(responses: Vec, FetchAttemptError>>) -> Self { Self { responses: tokio::sync::Mutex::new(VecDeque::from(responses)), request_count: AtomicUsize::new(0), @@ -854,7 +981,7 @@ mod tests { async fn fetch_requirements( &self, _auth: &CodexAuth, - ) -> Result, FetchCloudRequirementsError> { + ) -> Result, FetchAttemptError> { self.request_count.fetch_add(1, Ordering::SeqCst); let mut responses = self.responses.lock().await; responses.pop_front().unwrap_or(Ok(None)) @@ -872,7 +999,7 @@ mod tests { async fn fetch_requirements( &self, auth: &CodexAuth, - ) -> Result, FetchCloudRequirementsError> { + ) -> Result, FetchAttemptError> { self.request_count.fetch_add(1, Ordering::SeqCst); if matches!( auth.get_token().as_deref(), @@ -880,9 +1007,10 @@ mod tests { ) { Ok(Some(self.contents.clone())) } else { - Err(FetchCloudRequirementsError::Unauthorized( - CloudRequirementsLoadError::new("GET /config/requirements failed: 401"), - )) + Err(FetchAttemptError::Unauthorized { + status_code: Some(401), + error: CloudRequirementsLoadError::new("GET /config/requirements failed: 401"), + }) } } } @@ -897,11 +1025,12 @@ mod tests { async fn fetch_requirements( &self, _auth: &CodexAuth, - ) -> Result, FetchCloudRequirementsError> { + ) -> Result, FetchAttemptError> { self.request_count.fetch_add(1, Ordering::SeqCst); - Err(FetchCloudRequirementsError::Unauthorized( - CloudRequirementsLoadError::new(self.message.clone()), - )) + Err(FetchAttemptError::Unauthorized { + status_code: Some(401), + error: CloudRequirementsLoadError::new(self.message.clone()), + }) } } @@ -1252,7 +1381,10 @@ mod tests { .fetch() .await .expect_err("cloud requirements should fail closed"); - assert_eq!(err.to_string(), CLOUD_REQUIREMENTS_LOAD_FAILED_MESSAGE); + assert_eq!( + err.to_string(), + CLOUD_REQUIREMENTS_AUTH_RECOVERY_FAILED_MESSAGE + ); assert_eq!(fetcher.request_count.load(Ordering::SeqCst), 1); }