115115import org .springframework .kafka .support .micrometer .KafkaRecordReceiverContext ;
116116import org .springframework .kafka .support .micrometer .MicrometerHolder ;
117117import org .springframework .kafka .support .serializer .DeserializationException ;
118- import org .springframework .kafka .support .serializer .ErrorHandlingDeserializer ;
119118import org .springframework .kafka .support .serializer .SerializationUtils ;
120119import org .springframework .kafka .transaction .KafkaAwareTransactionManager ;
121120import org .springframework .lang .Nullable ;
129128import org .springframework .transaction .support .TransactionSynchronizationManager ;
130129import org .springframework .transaction .support .TransactionTemplate ;
131130import org .springframework .util .Assert ;
132- import org .springframework .util .ClassUtils ;
133131import org .springframework .util .CollectionUtils ;
134132import org .springframework .util .ObjectUtils ;
135133import org .springframework .util .StringUtils ;
@@ -919,10 +917,19 @@ else if (listener instanceof MessageListener) {
919917 this .logger .info (toString ());
920918 }
921919 Map <String , Object > props = KafkaMessageListenerContainer .this .consumerFactory .getConfigurationProperties ();
920+ ApplicationContext applicationContext = getApplicationContext ();
922921 this .checkNullKeyForExceptions = this .containerProperties .isCheckDeserExWhenKeyNull ()
923- || checkDeserializer (findDeserializerClass (props , consumerProperties , false ));
922+ || ErrorHandlingUtils .checkDeserializer (KafkaMessageListenerContainer .this .consumerFactory ,
923+ consumerProperties , false ,
924+ applicationContext == null
925+ ? getClass ().getClassLoader ()
926+ : applicationContext .getClassLoader ());
924927 this .checkNullValueForExceptions = this .containerProperties .isCheckDeserExWhenValueNull ()
925- || checkDeserializer (findDeserializerClass (props , consumerProperties , true ));
928+ || ErrorHandlingUtils .checkDeserializer (KafkaMessageListenerContainer .this .consumerFactory ,
929+ consumerProperties , true ,
930+ applicationContext == null
931+ ? getClass ().getClassLoader ()
932+ : applicationContext .getClassLoader ());
926933 this .syncCommitTimeout = determineSyncCommitTimeout ();
927934 if (this .containerProperties .getSyncCommitTimeout () == null ) {
928935 // update the property so we can use it directly from code elsewhere
@@ -1247,27 +1254,6 @@ else if (timeout instanceof String str) {
12471254 }
12481255 }
12491256
1250- @ Nullable
1251- private Object findDeserializerClass (Map <String , Object > props , Properties consumerOverrides , boolean isValue ) {
1252- Object configuredDeserializer = isValue
1253- ? KafkaMessageListenerContainer .this .consumerFactory .getValueDeserializer ()
1254- : KafkaMessageListenerContainer .this .consumerFactory .getKeyDeserializer ();
1255- if (configuredDeserializer == null ) {
1256- Object deser = consumerOverrides .get (isValue
1257- ? ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG
1258- : ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG );
1259- if (deser == null ) {
1260- deser = props .get (isValue
1261- ? ConsumerConfig .VALUE_DESERIALIZER_CLASS_CONFIG
1262- : ConsumerConfig .KEY_DESERIALIZER_CLASS_CONFIG );
1263- }
1264- return deser ;
1265- }
1266- else {
1267- return configuredDeserializer .getClass ();
1268- }
1269- }
1270-
12711257 private void subscribeOrAssignTopics (final Consumer <? super K , ? super V > subscribingConsumer ) {
12721258 if (KafkaMessageListenerContainer .this .topicPartitions == null ) {
12731259 ConsumerRebalanceListener rebalanceListener = new ListenerConsumerRebalanceListener ();
@@ -1293,29 +1279,6 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscr
12931279 }
12941280 }
12951281
1296- private boolean checkDeserializer (@ Nullable Object deser ) {
1297- Class <?> deserializer = null ;
1298- if (deser instanceof Class <?> deserClass ) {
1299- deserializer = deserClass ;
1300- }
1301- else if (deser instanceof String str ) {
1302- try {
1303- ApplicationContext applicationContext = getApplicationContext ();
1304- ClassLoader classLoader = applicationContext == null
1305- ? getClass ().getClassLoader ()
1306- : applicationContext .getClassLoader ();
1307- deserializer = ClassUtils .forName (str , classLoader );
1308- }
1309- catch (ClassNotFoundException | LinkageError e ) {
1310- throw new IllegalStateException (e );
1311- }
1312- }
1313- else if (deser != null ) {
1314- throw new IllegalStateException ("Deserializer must be a class or class name, not a " + deser .getClass ());
1315- }
1316- return deserializer != null && ErrorHandlingDeserializer .class .isAssignableFrom (deserializer );
1317- }
1318-
13191282 protected void checkConsumer () {
13201283 long timeSinceLastPoll = System .currentTimeMillis () - this .lastPoll ;
13211284 if (((float ) timeSinceLastPoll ) / (float ) this .containerProperties .getPollTimeout ()
0 commit comments