Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 32 additions & 31 deletions apollo-router/src/cache/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const DEFAULT_INTERNAL_REDIS_TIMEOUT: Duration = Duration::from_secs(5);
const REDIS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);

/// Record a Redis error as a metric, independent of having an active connection
fn record_redis_error(error: &RedisError, caller: &'static str) {
fn record_redis_error(error: &RedisError, caller: &'static str, context: &'static str) {
// Don't track NotFound errors as they're expected for cache misses

let error_type = match error.kind() {
Expand Down Expand Up @@ -97,6 +97,7 @@ fn record_redis_error(error: &RedisError, caller: &'static str) {
if !error.is_not_found() && !error.is_canceled() {
tracing::error!(
error_type = error_type,
context = context,
caller = caller,
error = ?error,
"Redis error occurred"
Expand Down Expand Up @@ -143,14 +144,13 @@ impl Drop for DropSafeRedisPool {
fn drop(&mut self) {
let inner = self.pool.clone();
let caller = self.caller;
self.heartbeat_abort_handle.abort();
tokio::spawn(async move {
let result = inner.quit().await;
if let Err(err) = result {
tracing::warn!("Caught error while closing unused Redis connections: {err:?}");
record_redis_error(&err, caller);
}
let _ = inner
.quit()
.await
.inspect_err(|err| record_redis_error(err, caller, "shutdown"));
});
self.heartbeat_abort_handle.abort();
// Metrics collector will be dropped automatically and its Drop impl will abort the task
}
}
Expand Down Expand Up @@ -248,9 +248,9 @@ where
impl RedisCacheStorage {
pub(crate) async fn new(config: RedisCache, caller: &'static str) -> Result<Self, BoxError> {
let url = Self::preprocess_urls(config.urls)
.inspect_err(|err| record_redis_error(err, caller))?;
.inspect_err(|err| record_redis_error(err, caller, "startup"))?;
let mut client_config = RedisConfig::from_url(url.as_str())
.inspect_err(|err| record_redis_error(err, caller))?;
.inspect_err(|err| record_redis_error(err, caller, "startup"))?;
let is_cluster = client_config.server.is_clustered();

if let Some(username) = config.username {
Expand Down Expand Up @@ -358,7 +358,7 @@ impl RedisCacheStorage {
let _ = client_config
.server
.set_cluster_discovery_policy(ClusterDiscoveryPolicy::ConfigEndpoint)
.inspect_err(|err| record_redis_error(err, caller));
.inspect_err(|err| record_redis_error(err, caller, "startup"));
}
})
.with_connection_config(|config| {
Expand Down Expand Up @@ -400,14 +400,7 @@ impl RedisCacheStorage {
tokio::spawn(async move {
loop {
match error_rx.recv().await {
Ok((error, Some(server))) => {
tracing::error!("Redis client ({server:?}) error: {error:?}",);
record_redis_error(&error, caller);
}
Ok((error, None)) => {
tracing::error!("Redis client error: {error:?}",);
record_redis_error(&error, caller);
}
Ok((error, _)) => record_redis_error(&error, caller, "client"),
Err(RecvError::Lagged(_)) => continue,
Err(RecvError::Closed) => break,
}
Expand Down Expand Up @@ -500,8 +493,8 @@ impl RedisCacheStorage {
}

/// Helper method to record Redis errors for metrics
fn record_error(&self, error: &RedisError) {
record_redis_error(error, self.inner.caller);
fn record_query_error(&self, error: &RedisError) {
record_redis_error(error, self.inner.caller, "query");
}

fn preprocess_urls(urls: Vec<Url>) -> Result<Url, RedisError> {
Expand Down Expand Up @@ -629,21 +622,29 @@ impl RedisCacheStorage {
let _: () = pipeline
.get(&key)
.await
.inspect_err(|e| self.record_error(e))?;
.inspect_err(|e| self.record_query_error(e))?;
let _: () = pipeline
.expire(&key, ttl.as_secs() as i64, None)
.await
.inspect_err(|e| self.record_error(e))?;
.inspect_err(|e| self.record_query_error(e))?;

let (value, _timeout_set): (RedisValue<V>, bool) =
pipeline.all().await.inspect_err(|e| self.record_error(e))?;
let (value, _timeout_set): (RedisValue<V>, bool) = pipeline
.all()
.await
.inspect_err(|e| self.record_query_error(e))?;
Ok(value)
} else if self.is_cluster {
let client = self.client().replicas().with_options(&options);
client.get(key).await.inspect_err(|e| self.record_error(e))
client
.get(key)
.await
.inspect_err(|e| self.record_query_error(e))
} else {
let client = self.client().with_options(&options);
client.get(key).await.inspect_err(|e| self.record_error(e))
client
.get(key)
.await
.inspect_err(|e| self.record_query_error(e))
}
}

Expand Down Expand Up @@ -694,7 +695,7 @@ impl RedisCacheStorage {
results_with_indexes.sort_unstable_by_key(|(index, _)| *index);
Ok(results_with_indexes
.into_iter()
.map(|(_, value)| value.inspect_err(|e| self.record_error(e)))
.map(|(_, value)| value.inspect_err(|e| self.record_query_error(e)))
.collect())
} else {
let keys = keys
Expand All @@ -706,7 +707,7 @@ impl RedisCacheStorage {
.with_options(&options)
.mget(keys)
.await
.inspect_err(|e| self.record_error(e))?;
.inspect_err(|e| self.record_query_error(e))?;
Ok(values
.into_iter()
.map(|v| v.ok_or(RedisError::new(RedisErrorKind::NotFound, "")))
Expand All @@ -731,7 +732,7 @@ impl RedisCacheStorage {
tracing::trace!("insert result {:?}", result);

if let Err(err) = result {
self.record_error(&err);
self.record_query_error(&err);
}
}

Expand All @@ -758,7 +759,7 @@ impl RedisCacheStorage {
Ok(values) => tracing::trace!("successfully inserted {} values", values.len()),
Err(err) => {
tracing::trace!("caught error during insert: {err:?}");
self.record_error(&err);
self.record_query_error(&err);
}
}
}
Expand Down Expand Up @@ -799,7 +800,7 @@ impl RedisCacheStorage {

let mut total = 0;
for result in results {
let count = result.inspect_err(|e| self.record_error(e))?;
let count = result.inspect_err(|e| self.record_query_error(e))?;
total += count;
}

Expand Down
3 changes: 3 additions & 0 deletions apollo-router/tests/integration/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,9 @@ async fn test_redis_connections_are_closed_on_router_reload() {
router.assert_reloaded().await;

router.assert_metrics_contains(expected_metric, None).await;

let error_metric = "apollo_router_cache_redis_errors_total";
router.assert_metrics_does_not_contain(error_metric).await;
}

#[tokio::test(flavor = "multi_thread")]
Expand Down