From ddd1a96561e70599d8332b0907ec00df930d324b Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 26 Jul 2023 15:15:22 -0400 Subject: [PATCH 1/2] Refactor DeserializationException Detection Code - move to `ErrorHandlingUtils` for reuse. --- .../AbstractMessageListenerContainer.java | 3 +- .../kafka/listener/ErrorHandlingUtils.java | 70 +++++++++++++++++++ .../KafkaMessageListenerContainer.java | 52 ++------------ 3 files changed, 76 insertions(+), 49 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 03c57a728d..5018c55571 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2022 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -82,6 +82,7 @@ public abstract class AbstractMessageListenerContainer protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass())); // NOSONAR + @NonNull protected final ConsumerFactory consumerFactory; // NOSONAR (final) private final ContainerProperties containerProperties; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index f4901ea1ba..5b0710be80 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -18,19 +18,27 @@ import java.time.Duration; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.function.BiConsumer; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.springframework.classify.BinaryExceptionClassifier; +import org.springframework.context.ApplicationContext; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.KafkaException; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.support.KafkaUtils; +import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; +import org.springframework.lang.Nullable; +import org.springframework.util.ClassUtils; import org.springframework.util.backoff.BackOff; import org.springframework.util.backoff.BackOffExecution; @@ -256,4 +264,66 @@ public static Exception findRootCause(Exception exception) { return realException; } + /** + * Determine whether the key or value deserializer is an instance of + * {@link ErrorHandlingDeserializer}. + * @param the key type. + * @param the value type. + * @param consumerFactory the consumer factory. + * @param consumerOverrides consumer factory property overrides. + * @param isValue true to find the value deserializer. + * @param applicationContext the application context. + * @return true if the deserializer is an instance of + * {@link ErrorHandlingDeserializer}. + * @since 3.0.10 + */ + public static boolean checkDeserializer(ConsumerFactory consumerFactory, + Properties consumerOverrides, boolean isValue, @Nullable ApplicationContext applicationContext) { + + Object deser = findDeserializerClass(consumerFactory, consumerOverrides, isValue); + Class deserializer = null; + if (deser instanceof Class deserClass) { + deserializer = deserClass; + } + else if (deser instanceof String str) { + try { + ClassLoader classLoader = applicationContext == null + ? consumerFactory.getClass().getClassLoader() + : applicationContext.getClassLoader(); + deserializer = ClassUtils.forName(str, classLoader); + } + catch (ClassNotFoundException | LinkageError e) { + throw new IllegalStateException(e); + } + } + else if (deser != null) { + throw new IllegalStateException("Deserializer must be a class or class name, not a " + deser.getClass()); + } + return deserializer != null && ErrorHandlingDeserializer.class.isAssignableFrom(deserializer); + } + + @Nullable + private static Object findDeserializerClass(ConsumerFactory consumerFactory, + Properties consumerOverrides, boolean isValue) { + + Map props = consumerFactory.getConfigurationProperties(); + Object configuredDeserializer = isValue + ? consumerFactory.getValueDeserializer() + : consumerFactory.getKeyDeserializer(); + if (configuredDeserializer == null) { + Object deser = consumerOverrides.get(isValue + ? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + : ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + if (deser == null) { + deser = props.get(isValue + ? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + : ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); + } + return deser; + } + else { + return configuredDeserializer.getClass(); + } + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index daa41eff95..bb95178072 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -115,7 +115,6 @@ import org.springframework.kafka.support.micrometer.KafkaRecordReceiverContext; import org.springframework.kafka.support.micrometer.MicrometerHolder; import org.springframework.kafka.support.serializer.DeserializationException; -import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer; import org.springframework.kafka.support.serializer.SerializationUtils; import org.springframework.kafka.transaction.KafkaAwareTransactionManager; import org.springframework.lang.Nullable; @@ -129,7 +128,6 @@ import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; @@ -920,9 +918,11 @@ else if (listener instanceof MessageListener) { } Map props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties(); this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull() - || checkDeserializer(findDeserializerClass(props, consumerProperties, false)); + || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, consumerProperties, false, + getApplicationContext()); this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull() - || checkDeserializer(findDeserializerClass(props, consumerProperties, true)); + || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, consumerProperties, true, + getApplicationContext()); this.syncCommitTimeout = determineSyncCommitTimeout(); if (this.containerProperties.getSyncCommitTimeout() == null) { // update the property so we can use it directly from code elsewhere @@ -1247,27 +1247,6 @@ else if (timeout instanceof String str) { } } - @Nullable - private Object findDeserializerClass(Map props, Properties consumerOverrides, boolean isValue) { - Object configuredDeserializer = isValue - ? KafkaMessageListenerContainer.this.consumerFactory.getValueDeserializer() - : KafkaMessageListenerContainer.this.consumerFactory.getKeyDeserializer(); - if (configuredDeserializer == null) { - Object deser = consumerOverrides.get(isValue - ? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - : ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); - if (deser == null) { - deser = props.get(isValue - ? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG - : ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); - } - return deser; - } - else { - return configuredDeserializer.getClass(); - } - } - private void subscribeOrAssignTopics(final Consumer subscribingConsumer) { if (KafkaMessageListenerContainer.this.topicPartitions == null) { ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener(); @@ -1293,29 +1272,6 @@ private void subscribeOrAssignTopics(final Consumer subscr } } - private boolean checkDeserializer(@Nullable Object deser) { - Class deserializer = null; - if (deser instanceof Class deserClass) { - deserializer = deserClass; - } - else if (deser instanceof String str) { - try { - ApplicationContext applicationContext = getApplicationContext(); - ClassLoader classLoader = applicationContext == null - ? getClass().getClassLoader() - : applicationContext.getClassLoader(); - deserializer = ClassUtils.forName(str, classLoader); - } - catch (ClassNotFoundException | LinkageError e) { - throw new IllegalStateException(e); - } - } - else if (deser != null) { - throw new IllegalStateException("Deserializer must be a class or class name, not a " + deser.getClass()); - } - return deserializer != null && ErrorHandlingDeserializer.class.isAssignableFrom(deserializer); - } - protected void checkConsumer() { long timeSinceLastPoll = System.currentTimeMillis() - this.lastPoll; if (((float) timeSinceLastPoll) / (float) this.containerProperties.getPollTimeout() From 89caea5655435eb1b5c4c72d961946f321b21ea5 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Wed, 26 Jul 2023 15:31:35 -0400 Subject: [PATCH 2/2] Pass class loader into utility method instead of an ApplicationContext. --- .../kafka/listener/ErrorHandlingUtils.java | 8 ++------ .../listener/KafkaMessageListenerContainer.java | 15 +++++++++++---- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java index 5b0710be80..88614ee476 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ErrorHandlingUtils.java @@ -31,7 +31,6 @@ import org.apache.kafka.common.errors.WakeupException; import org.springframework.classify.BinaryExceptionClassifier; -import org.springframework.context.ApplicationContext; import org.springframework.core.log.LogAccessor; import org.springframework.kafka.KafkaException; import org.springframework.kafka.core.ConsumerFactory; @@ -272,13 +271,13 @@ public static Exception findRootCause(Exception exception) { * @param consumerFactory the consumer factory. * @param consumerOverrides consumer factory property overrides. * @param isValue true to find the value deserializer. - * @param applicationContext the application context. + * @param classLoader the class loader to load the deserializer class. * @return true if the deserializer is an instance of * {@link ErrorHandlingDeserializer}. * @since 3.0.10 */ public static boolean checkDeserializer(ConsumerFactory consumerFactory, - Properties consumerOverrides, boolean isValue, @Nullable ApplicationContext applicationContext) { + Properties consumerOverrides, boolean isValue, ClassLoader classLoader) { Object deser = findDeserializerClass(consumerFactory, consumerOverrides, isValue); Class deserializer = null; @@ -287,9 +286,6 @@ public static boolean checkDeserializer(ConsumerFactory consumerFac } else if (deser instanceof String str) { try { - ClassLoader classLoader = applicationContext == null - ? consumerFactory.getClass().getClassLoader() - : applicationContext.getClassLoader(); deserializer = ClassUtils.forName(str, classLoader); } catch (ClassNotFoundException | LinkageError e) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index bb95178072..91dc791308 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -917,12 +917,19 @@ else if (listener instanceof MessageListener) { this.logger.info(toString()); } Map props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties(); + ApplicationContext applicationContext = getApplicationContext(); this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull() - || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, consumerProperties, false, - getApplicationContext()); + || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, + consumerProperties, false, + applicationContext == null + ? getClass().getClassLoader() + : applicationContext.getClassLoader()); this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull() - || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, consumerProperties, true, - getApplicationContext()); + || ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory, + consumerProperties, true, + applicationContext == null + ? getClass().getClassLoader() + : applicationContext.getClassLoader()); this.syncCommitTimeout = determineSyncCommitTimeout(); if (this.containerProperties.getSyncCommitTimeout() == null) { // update the property so we can use it directly from code elsewhere