Skip to content

Commit 53019ca

Browse files
authored
fix: clean up Redis behavior during hot-reload, deduplicate error logging (#8743)
1 parent 9fb0520 commit 53019ca

2 files changed

Lines changed: 35 additions & 31 deletions

File tree

apollo-router/src/cache/redis.rs

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ const DEFAULT_INTERNAL_REDIS_TIMEOUT: Duration = Duration::from_secs(5);
6161
const REDIS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);
6262

6363
/// Record a Redis error as a metric, independent of having an active connection
64-
fn record_redis_error(error: &RedisError, caller: &'static str) {
64+
fn record_redis_error(error: &RedisError, caller: &'static str, context: &'static str) {
6565
// Don't track NotFound errors as they're expected for cache misses
6666

6767
let error_type = match error.kind() {
@@ -97,6 +97,7 @@ fn record_redis_error(error: &RedisError, caller: &'static str) {
9797
if !error.is_not_found() && !error.is_canceled() {
9898
tracing::error!(
9999
error_type = error_type,
100+
context = context,
100101
caller = caller,
101102
error = ?error,
102103
"Redis error occurred"
@@ -143,14 +144,13 @@ impl Drop for DropSafeRedisPool {
143144
fn drop(&mut self) {
144145
let inner = self.pool.clone();
145146
let caller = self.caller;
147+
self.heartbeat_abort_handle.abort();
146148
tokio::spawn(async move {
147-
let result = inner.quit().await;
148-
if let Err(err) = result {
149-
tracing::warn!("Caught error while closing unused Redis connections: {err:?}");
150-
record_redis_error(&err, caller);
151-
}
149+
let _ = inner
150+
.quit()
151+
.await
152+
.inspect_err(|err| record_redis_error(err, caller, "shutdown"));
152153
});
153-
self.heartbeat_abort_handle.abort();
154154
// Metrics collector will be dropped automatically and its Drop impl will abort the task
155155
}
156156
}
@@ -248,9 +248,9 @@ where
248248
impl RedisCacheStorage {
249249
pub(crate) async fn new(config: RedisCache, caller: &'static str) -> Result<Self, BoxError> {
250250
let url = Self::preprocess_urls(config.urls)
251-
.inspect_err(|err| record_redis_error(err, caller))?;
251+
.inspect_err(|err| record_redis_error(err, caller, "startup"))?;
252252
let mut client_config = RedisConfig::from_url(url.as_str())
253-
.inspect_err(|err| record_redis_error(err, caller))?;
253+
.inspect_err(|err| record_redis_error(err, caller, "startup"))?;
254254
let is_cluster = client_config.server.is_clustered();
255255

256256
if let Some(username) = config.username {
@@ -358,7 +358,7 @@ impl RedisCacheStorage {
358358
let _ = client_config
359359
.server
360360
.set_cluster_discovery_policy(ClusterDiscoveryPolicy::ConfigEndpoint)
361-
.inspect_err(|err| record_redis_error(err, caller));
361+
.inspect_err(|err| record_redis_error(err, caller, "startup"));
362362
}
363363
})
364364
.with_connection_config(|config| {
@@ -400,14 +400,7 @@ impl RedisCacheStorage {
400400
tokio::spawn(async move {
401401
loop {
402402
match error_rx.recv().await {
403-
Ok((error, Some(server))) => {
404-
tracing::error!("Redis client ({server:?}) error: {error:?}",);
405-
record_redis_error(&error, caller);
406-
}
407-
Ok((error, None)) => {
408-
tracing::error!("Redis client error: {error:?}",);
409-
record_redis_error(&error, caller);
410-
}
403+
Ok((error, _)) => record_redis_error(&error, caller, "client"),
411404
Err(RecvError::Lagged(_)) => continue,
412405
Err(RecvError::Closed) => break,
413406
}
@@ -500,8 +493,8 @@ impl RedisCacheStorage {
500493
}
501494

502495
/// Helper method to record Redis errors for metrics
503-
fn record_error(&self, error: &RedisError) {
504-
record_redis_error(error, self.inner.caller);
496+
fn record_query_error(&self, error: &RedisError) {
497+
record_redis_error(error, self.inner.caller, "query");
505498
}
506499

507500
fn preprocess_urls(urls: Vec<Url>) -> Result<Url, RedisError> {
@@ -629,21 +622,29 @@ impl RedisCacheStorage {
629622
let _: () = pipeline
630623
.get(&key)
631624
.await
632-
.inspect_err(|e| self.record_error(e))?;
625+
.inspect_err(|e| self.record_query_error(e))?;
633626
let _: () = pipeline
634627
.expire(&key, ttl.as_secs() as i64, None)
635628
.await
636-
.inspect_err(|e| self.record_error(e))?;
629+
.inspect_err(|e| self.record_query_error(e))?;
637630

638-
let (value, _timeout_set): (RedisValue<V>, bool) =
639-
pipeline.all().await.inspect_err(|e| self.record_error(e))?;
631+
let (value, _timeout_set): (RedisValue<V>, bool) = pipeline
632+
.all()
633+
.await
634+
.inspect_err(|e| self.record_query_error(e))?;
640635
Ok(value)
641636
} else if self.is_cluster {
642637
let client = self.client().replicas().with_options(&options);
643-
client.get(key).await.inspect_err(|e| self.record_error(e))
638+
client
639+
.get(key)
640+
.await
641+
.inspect_err(|e| self.record_query_error(e))
644642
} else {
645643
let client = self.client().with_options(&options);
646-
client.get(key).await.inspect_err(|e| self.record_error(e))
644+
client
645+
.get(key)
646+
.await
647+
.inspect_err(|e| self.record_query_error(e))
647648
}
648649
}
649650

@@ -694,7 +695,7 @@ impl RedisCacheStorage {
694695
results_with_indexes.sort_unstable_by_key(|(index, _)| *index);
695696
Ok(results_with_indexes
696697
.into_iter()
697-
.map(|(_, value)| value.inspect_err(|e| self.record_error(e)))
698+
.map(|(_, value)| value.inspect_err(|e| self.record_query_error(e)))
698699
.collect())
699700
} else {
700701
let keys = keys
@@ -706,7 +707,7 @@ impl RedisCacheStorage {
706707
.with_options(&options)
707708
.mget(keys)
708709
.await
709-
.inspect_err(|e| self.record_error(e))?;
710+
.inspect_err(|e| self.record_query_error(e))?;
710711
Ok(values
711712
.into_iter()
712713
.map(|v| v.ok_or(RedisError::new(RedisErrorKind::NotFound, "")))
@@ -731,7 +732,7 @@ impl RedisCacheStorage {
731732
tracing::trace!("insert result {:?}", result);
732733

733734
if let Err(err) = result {
734-
self.record_error(&err);
735+
self.record_query_error(&err);
735736
}
736737
}
737738

@@ -758,7 +759,7 @@ impl RedisCacheStorage {
758759
Ok(values) => tracing::trace!("successfully inserted {} values", values.len()),
759760
Err(err) => {
760761
tracing::trace!("caught error during insert: {err:?}");
761-
self.record_error(&err);
762+
self.record_query_error(&err);
762763
}
763764
}
764765
}
@@ -799,7 +800,7 @@ impl RedisCacheStorage {
799800

800801
let mut total = 0;
801802
for result in results {
802-
let count = result.inspect_err(|e| self.record_error(e))?;
803+
let count = result.inspect_err(|e| self.record_query_error(e))?;
803804
total += count;
804805
}
805806

apollo-router/tests/integration/redis.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1585,6 +1585,9 @@ async fn test_redis_connections_are_closed_on_router_reload() {
15851585
router.assert_reloaded().await;
15861586

15871587
router.assert_metrics_contains(expected_metric, None).await;
1588+
1589+
let error_metric = "apollo_router_cache_redis_errors_total";
1590+
router.assert_metrics_does_not_contain(error_metric).await;
15881591
}
15891592

15901593
#[tokio::test(flavor = "multi_thread")]

0 commit comments

Comments
 (0)