From 3b3490f61938ef544be3350d770627525e6323aa Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Wed, 19 Jun 2024 02:49:29 +0900 Subject: [PATCH 01/14] GH-2302: Enable consumer seek only on matching group Id --- .../listener/AbstractConsumerSeekAware.java | 60 +++++++++++++------ .../kafka/listener/ConsumerSeekAware.java | 11 ++++ .../KafkaMessageListenerContainer.java | 3 +- .../listener/ConsumerSeekAwareTests.java | 48 ++++++++++----- 4 files changed, 88 insertions(+), 34 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java index 093a1c3568..4373ea5eb3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2023 the original author or authors. + * Copyright 2019-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ * having to keep track of the callbacks itself. * * @author Gary Russell + * @author Borahm Lee * @since 2.3 * */ @@ -46,43 +47,59 @@ public abstract class AbstractConsumerSeekAware implements ConsumerSeekAware { @Override public void registerSeekCallback(ConsumerSeekCallback callback) { - this.callbackForThread.put(Thread.currentThread(), callback); + if (matchGroupId()) { + this.callbackForThread.put(Thread.currentThread(), callback); + } } @Override public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) { - ConsumerSeekCallback threadCallback = this.callbackForThread.get(Thread.currentThread()); - if (threadCallback != null) { - assignments.keySet().forEach(tp -> { - this.callbacks.put(tp, threadCallback); - this.callbacksToTopic.computeIfAbsent(threadCallback, key -> new LinkedList<>()).add(tp); - }); + if (matchGroupId()) { + ConsumerSeekCallback threadCallback = this.callbackForThread.get(Thread.currentThread()); + if (threadCallback != null) { + assignments.keySet() + .forEach(tp -> { + this.callbacks.put(tp, threadCallback); + this.callbacksToTopic.computeIfAbsent(threadCallback, key -> new LinkedList<>()) + .add(tp); + }); + } } } @Override public void onPartitionsRevoked(Collection partitions) { - partitions.forEach(tp -> { - ConsumerSeekCallback removed = this.callbacks.remove(tp); - if (removed != null) { - List topics = this.callbacksToTopic.get(removed); - if (topics != null) { - topics.remove(tp); - if (topics.size() == 0) { - this.callbacksToTopic.remove(removed); + if (matchGroupId()) { + partitions.forEach(tp -> { + ConsumerSeekCallback removed = this.callbacks.remove(tp); + if (removed != null) { + List topics = this.callbacksToTopic.get(removed); + if (topics != null) { + topics.remove(tp); + if (topics.size() == 0) { + this.callbacksToTopic.remove(removed); + } } } - } - }); + }); + } } @Override public void unregisterSeekCallback() { - this.callbackForThread.remove(Thread.currentThread()); + if (matchGroupId()) { + this.callbackForThread.remove(Thread.currentThread()); + } + } + + @Override + public boolean matchGroupId() { + return true; } /** * Return the callback for the specified topic/partition. + * * @param topicPartition the topic/partition. * @return the callback (or null if there is no assignment). */ @@ -93,6 +110,7 @@ protected ConsumerSeekCallback getSeekCallbackFor(TopicPartition topicPartition) /** * The map of callbacks for all currently assigned partitions. + * * @return the map. */ protected Map getSeekCallbacks() { @@ -101,6 +119,7 @@ protected Map getSeekCallbacks() { /** * Return the currently registered callbacks and their associated {@link TopicPartition}(s). + * * @return the map of callbacks and partitions. * @since 2.6 */ @@ -110,6 +129,7 @@ protected Map> getCallbacksAndTopics( /** * Seek all assigned partitions to the beginning. + * * @since 2.6 */ public void seekToBeginning() { @@ -118,6 +138,7 @@ public void seekToBeginning() { /** * Seek all assigned partitions to the end. + * * @since 2.6 */ public void seekToEnd() { @@ -126,6 +147,7 @@ public void seekToEnd() { /** * Seek all assigned partitions to the offset represented by the timestamp. + * * @param time the time to seek to. * @since 2.6 */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java index ae4c33cc6e..6fed99ef2a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java @@ -29,6 +29,7 @@ * * @author Gary Russell * @author Soby Chacko + * @author Borahm Lee * @since 1.1 * */ @@ -88,6 +89,16 @@ default void onFirstPoll() { default void unregisterSeekCallback() { } + /** + * Determine if the consumer group ID for seeking matches the expected value. + * + * @return true if the group ID matches, false otherwise. + * @since 3.3 + */ + default boolean matchGroupId() { + return false; + } + /** * A callback that a listener can invoke to seek to a specific offset. */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 17653683a0..91cb0136a1 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -165,6 +165,7 @@ * @author Raphael Rösch * @author Christian Mergenthaler * @author Mikael Carlstedt + * @author Borahm Lee */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -1362,8 +1363,8 @@ protected void initialize() { } publishConsumerStartingEvent(); this.consumerThread = Thread.currentThread(); - setupSeeks(); KafkaUtils.setConsumerGroupId(this.consumerGroupId); + setupSeeks(); this.count = 0; this.last = System.currentTimeMillis(); initAssignedPartitions(); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerSeekAwareTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerSeekAwareTests.java index d579423f3d..b215aa0074 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerSeekAwareTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerSeekAwareTests.java @@ -16,28 +16,27 @@ package org.springframework.kafka.listener; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; - import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback; import org.springframework.kafka.test.utils.KafkaTestUtils; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + /** * @author Gary Russell + * @author Borahm Lee * @since 2.6 - * */ public class ConsumerSeekAwareTests { @@ -51,7 +50,7 @@ class CSA extends AbstractConsumerSeekAware { var exec1 = Executors.newSingleThreadExecutor(); var exec2 = Executors.newSingleThreadExecutor(); var cb1 = mock(ConsumerSeekCallback.class); - var cb2 = mock(ConsumerSeekCallback.class); + var cb2 = mock(ConsumerSeekCallback.class); var first = new AtomicBoolean(true); var map1 = new LinkedHashMap<>(Map.of(new TopicPartition("foo", 0), 0L, new TopicPartition("foo", 1), 0L)); var map2 = new LinkedHashMap<>(Map.of(new TopicPartition("foo", 2), 0L, new TopicPartition("foo", 3), 0L)); @@ -59,8 +58,7 @@ class CSA extends AbstractConsumerSeekAware { if (first.getAndSet(false)) { csa.registerSeekCallback(cb1); csa.onPartitionsAssigned(map1, null); - } - else { + } else { csa.registerSeekCallback(cb2); csa.onPartitionsAssigned(map2, null); } @@ -80,8 +78,7 @@ class CSA extends AbstractConsumerSeekAware { var revoke1 = (Callable) () -> { if (!first.getAndSet(true)) { csa.onPartitionsRevoked(Collections.singletonList(map1.keySet().iterator().next())); - } - else { + } else { csa.onPartitionsRevoked(Collections.singletonList(map2.keySet().iterator().next())); } return null; @@ -96,8 +93,7 @@ class CSA extends AbstractConsumerSeekAware { var revoke2 = (Callable) () -> { if (first.getAndSet(false)) { csa.onPartitionsRevoked(Collections.singletonList(map1.keySet().iterator().next())); - } - else { + } else { csa.onPartitionsRevoked(Collections.singletonList(map2.keySet().iterator().next())); } return null; @@ -118,4 +114,28 @@ class CSA extends AbstractConsumerSeekAware { exec2.shutdown(); } + @SuppressWarnings("unchecked") + @Test + void notMatchedGroupId() throws ExecutionException, InterruptedException { + class CSA extends AbstractConsumerSeekAware { + @Override + public boolean matchGroupId() { + return false; + } + } + + AbstractConsumerSeekAware csa = new CSA(); + var exec = Executors.newSingleThreadExecutor(); + var register = (Callable) () -> { + csa.registerSeekCallback(mock(ConsumerSeekCallback.class)); + csa.onPartitionsAssigned(Map.of(new TopicPartition("baz", 0), 0L), null); + return null; + }; + exec.submit(register).get(); + assertThat(KafkaTestUtils.getPropertyValue(csa, "callbackForThread", Map.class)).isEmpty(); + assertThat(KafkaTestUtils.getPropertyValue(csa, "callbacks", Map.class)).isEmpty(); + assertThat(KafkaTestUtils.getPropertyValue(csa, "callbacksToTopic", Map.class)).isEmpty(); + exec.shutdown(); + } + } From a81a06dc5846ceb7d169d832ef2ef684bd2b7c67 Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Sat, 22 Jun 2024 23:27:26 +0900 Subject: [PATCH 02/14] Add groupId getter to ConsumerSeekCallback and remove match method --- .../listener/AbstractConsumerSeekAware.java | 60 +++----- .../kafka/listener/ConsumerSeekAware.java | 71 ++++++---- .../KafkaMessageListenerContainer.java | 11 +- .../AbstractConsumerSeekAwareTests.java | 133 ++++++++++++++++++ .../listener/ConsumerSeekAwareTests.java | 48 ++----- 5 files changed, 218 insertions(+), 105 deletions(-) create mode 100644 spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java index 4373ea5eb3..093a1c3568 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractConsumerSeekAware.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2024 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,7 +33,6 @@ * having to keep track of the callbacks itself. * * @author Gary Russell - * @author Borahm Lee * @since 2.3 * */ @@ -47,59 +46,43 @@ public abstract class AbstractConsumerSeekAware implements ConsumerSeekAware { @Override public void registerSeekCallback(ConsumerSeekCallback callback) { - if (matchGroupId()) { - this.callbackForThread.put(Thread.currentThread(), callback); - } + this.callbackForThread.put(Thread.currentThread(), callback); } @Override public void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) { - if (matchGroupId()) { - ConsumerSeekCallback threadCallback = this.callbackForThread.get(Thread.currentThread()); - if (threadCallback != null) { - assignments.keySet() - .forEach(tp -> { - this.callbacks.put(tp, threadCallback); - this.callbacksToTopic.computeIfAbsent(threadCallback, key -> new LinkedList<>()) - .add(tp); - }); - } + ConsumerSeekCallback threadCallback = this.callbackForThread.get(Thread.currentThread()); + if (threadCallback != null) { + assignments.keySet().forEach(tp -> { + this.callbacks.put(tp, threadCallback); + this.callbacksToTopic.computeIfAbsent(threadCallback, key -> new LinkedList<>()).add(tp); + }); } } @Override public void onPartitionsRevoked(Collection partitions) { - if (matchGroupId()) { - partitions.forEach(tp -> { - ConsumerSeekCallback removed = this.callbacks.remove(tp); - if (removed != null) { - List topics = this.callbacksToTopic.get(removed); - if (topics != null) { - topics.remove(tp); - if (topics.size() == 0) { - this.callbacksToTopic.remove(removed); - } + partitions.forEach(tp -> { + ConsumerSeekCallback removed = this.callbacks.remove(tp); + if (removed != null) { + List topics = this.callbacksToTopic.get(removed); + if (topics != null) { + topics.remove(tp); + if (topics.size() == 0) { + this.callbacksToTopic.remove(removed); } } - }); - } + } + }); } @Override public void unregisterSeekCallback() { - if (matchGroupId()) { - this.callbackForThread.remove(Thread.currentThread()); - } - } - - @Override - public boolean matchGroupId() { - return true; + this.callbackForThread.remove(Thread.currentThread()); } /** * Return the callback for the specified topic/partition. - * * @param topicPartition the topic/partition. * @return the callback (or null if there is no assignment). */ @@ -110,7 +93,6 @@ protected ConsumerSeekCallback getSeekCallbackFor(TopicPartition topicPartition) /** * The map of callbacks for all currently assigned partitions. - * * @return the map. */ protected Map getSeekCallbacks() { @@ -119,7 +101,6 @@ protected Map getSeekCallbacks() { /** * Return the currently registered callbacks and their associated {@link TopicPartition}(s). - * * @return the map of callbacks and partitions. * @since 2.6 */ @@ -129,7 +110,6 @@ protected Map> getCallbacksAndTopics( /** * Seek all assigned partitions to the beginning. - * * @since 2.6 */ public void seekToBeginning() { @@ -138,7 +118,6 @@ public void seekToBeginning() { /** * Seek all assigned partitions to the end. - * * @since 2.6 */ public void seekToEnd() { @@ -147,7 +126,6 @@ public void seekToEnd() { /** * Seek all assigned partitions to the offset represented by the timestamp. - * * @param time the time to seek to. * @since 2.6 */ diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java index 6fed99ef2a..868d0128a9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java @@ -19,8 +19,8 @@ import java.util.Collection; import java.util.Map; import java.util.function.Function; - import org.apache.kafka.common.TopicPartition; +import org.springframework.lang.Nullable; /** * Listeners that implement this interface are provided with a @@ -31,7 +31,6 @@ * @author Soby Chacko * @author Borahm Lee * @since 1.1 - * */ public interface ConsumerSeekAware { @@ -40,6 +39,7 @@ public interface ConsumerSeekAware { * {@code ConcurrentMessageListenerContainer} or the same listener instance in multiple * containers listeners should store the callback in a {@code ThreadLocal} or a map keyed * by the thread. + * * @param callback the callback. */ default void registerSeekCallback(ConsumerSeekCallback callback) { @@ -47,8 +47,9 @@ default void registerSeekCallback(ConsumerSeekCallback callback) { /** * When using group management, called when partition assignments change. + * * @param assignments the new assignments and their current offsets. - * @param callback the callback to perform an initial seek after assignment. + * @param callback the callback to perform an initial seek after assignment. */ default void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) { } @@ -57,6 +58,7 @@ default void onPartitionsAssigned(Map assignments, Consume * When using group management, called when partition assignments are revoked. * Listeners should discard any callback saved from * {@link #registerSeekCallback(ConsumerSeekCallback)} on this thread. + * * @param partitions the partitions that have been revoked. * @since 2.3 */ @@ -66,8 +68,9 @@ default void onPartitionsRevoked(Collection partitions) { /** * If the container is configured to emit idle container events, this method is called * when the container idle event is emitted - allowing a seek operation. + * * @param assignments the new assignments and their current offsets. - * @param callback the callback to perform a seek. + * @param callback the callback to perform a seek. */ default void onIdleContainer(Map assignments, ConsumerSeekCallback callback) { } @@ -76,6 +79,7 @@ default void onIdleContainer(Map assignments, ConsumerSeek * When using manual partition assignment, called when the first poll has completed; * useful when using {@code auto.offset.reset=latest} and you need to wait until the * initial position has been established. + * * @since 2.8.8 */ default void onFirstPoll() { @@ -84,21 +88,12 @@ default void onFirstPoll() { /** * Called when the listener consumer terminates allowing implementations to clean up * state, such as thread locals. + * * @since 2.4 */ default void unregisterSeekCallback() { } - /** - * Determine if the consumer group ID for seeking matches the expected value. - * - * @return true if the group ID matches, false otherwise. - * @since 3.3 - */ - default boolean matchGroupId() { - return false; - } - /** * A callback that a listener can invoke to seek to a specific offset. */ @@ -112,9 +107,10 @@ interface ConsumerSeekCallback { * queue the seek operation to the consumer. The queued seek will occur after any * pending offset commits. The consumer must be currently assigned the specified * partition. - * @param topic the topic. + * + * @param topic the topic. * @param partition the partition. - * @param offset the offset (absolute). + * @param offset the offset (absolute). */ void seek(String topic, int partition, long offset); @@ -128,8 +124,9 @@ interface ConsumerSeekCallback { * queue the seek operation to the consumer. The queued seek will occur after any * pending offset commits. The consumer must be currently assigned the specified * partition. - * @param topic the topic. - * @param partition the partition. + * + * @param topic the topic. + * @param partition the partition. * @param offsetComputeFunction function to compute the absolute offset to seek to. * @since 3.2.0 */ @@ -143,7 +140,8 @@ interface ConsumerSeekCallback { * the seek operation to the consumer. The queued seek will occur after * any pending offset commits. The consumer must be currently assigned the * specified partition. - * @param topic the topic. + * + * @param topic the topic. * @param partition the partition. */ void seekToBeginning(String topic, int partition); @@ -156,6 +154,7 @@ interface ConsumerSeekCallback { * queue the seek operation to the consumer for each * {@link TopicPartition}. The seek will occur after any pending offset commits. * The consumer must be currently assigned the specified partition(s). + * * @param partitions the {@link TopicPartition}s. * @since 2.3.4 */ @@ -171,7 +170,8 @@ default void seekToBeginning(Collection partitions) { * the seek operation to the consumer. The queued seek will occur after any * pending offset commits. The consumer must be currently assigned the specified * partition. - * @param topic the topic. + * + * @param topic the topic. * @param partition the partition. */ void seekToEnd(String topic, int partition); @@ -184,6 +184,7 @@ default void seekToBeginning(Collection partitions) { * the seek operation to the consumer for each {@link TopicPartition}. The queued * seek(s) will occur after any pending offset commits. The consumer must be * currently assigned the specified partition(s). + * * @param partitions the {@link TopicPartition}s. * @since 2.3.4 */ @@ -198,12 +199,13 @@ default void seekToEnd(Collection partitions) { * perform the seek immediately on the consumer. When called from elsewhere, queue * the seek operation. The queued seek will occur after any pending offset * commits. The consumer must be currently assigned the specified partition. - * @param topic the topic. + * + * @param topic the topic. * @param partition the partition. - * @param offset the offset; positive values are relative to the start, negative - * values are relative to the end, unless toCurrent is true. + * @param offset the offset; positive values are relative to the start, negative + * values are relative to the end, unless toCurrent is true. * @param toCurrent true for the offset to be relative to the current position - * rather than the beginning or end. + * rather than the beginning or end. * @since 2.3 */ void seekRelative(String topic, int partition, long offset, boolean toCurrent); @@ -218,11 +220,12 @@ default void seekToEnd(Collection partitions) { * commits. The consumer must be currently assigned the specified partition. Use * {@link #seekToTimestamp(Collection, long)} when seeking multiple partitions * because the offset lookup is blocking. - * @param topic the topic. + * + * @param topic the topic. * @param partition the partition. * @param timestamp the time stamp. - * @since 2.3 * @see #seekToTimestamp(Collection, long) + * @since 2.3 */ void seekToTimestamp(String topic, int partition, long timestamp); @@ -234,12 +237,26 @@ default void seekToEnd(Collection partitions) { * perform the seek immediately on the consumer. When called from elsewhere, queue * the seek operation. The queued seek will occur after any pending offset * commits. The consumer must be currently assigned the specified partition. + * * @param topicPartitions the topic/partitions. - * @param timestamp the time stamp. + * @param timestamp the time stamp. * @since 2.3 */ void seekToTimestamp(Collection topicPartitions, long timestamp); + + /** + * Retrieve the group ID associated with this consumer seek callback, if available. + * This method returns {@code null} by default, indicating that the group ID is not specified. + * Implementations may override this method to provide a specific group ID value. + * + * @return the consumer group ID. + * @since 3.3 + */ + @Nullable + default String getGroupId() { + return null; + } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 91cb0136a1..f1f84940be 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -682,7 +682,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final TransactionTemplate transactionTemplate; - private final String consumerGroupId = getGroupId(); + private final String consumerGroupId = KafkaMessageListenerContainer.this.getGroupId(); private final TaskScheduler taskScheduler; @@ -1907,7 +1907,7 @@ private void wrapUp(@Nullable Throwable throwable) { this.consumerSeekAwareListener.onPartitionsRevoked(partitions); this.consumerSeekAwareListener.unregisterSeekCallback(); } - this.logger.info(() -> getGroupId() + ": Consumer stopped"); + this.logger.info(() -> KafkaMessageListenerContainer.this.getGroupId() + ": Consumer stopped"); publishConsumerStoppedEvent(throwable); } @@ -2694,7 +2694,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation( this.containerProperties.getObservationConvention(), DefaultKafkaListenerObservationConvention.INSTANCE, - () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), getGroupId(), + () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), KafkaMessageListenerContainer.this.getGroupId(), this::clusterId), this.observationRegistry); return observation.observe(() -> { @@ -3328,6 +3328,11 @@ public void seekToTimestamp(Collection topicParts, long timestam topicParts.forEach(tp -> seekToTimestamp(tp.topic(), tp.partition(), timestamp)); } + @Override + public String getGroupId() { + return KafkaMessageListenerContainer.this.getGroupId(); + } + @Override public String toString() { return "KafkaMessageListenerContainer.ListenerConsumer [" diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java new file mode 100644 index 0000000000..77cdea8027 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java @@ -0,0 +1,133 @@ +package org.springframework.kafka.listener; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.AbstractConsumerSeekAwareTests.Config.MultiGroupListener; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.stereotype.Component; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@DirtiesContext +@SpringJUnitConfig +@EmbeddedKafka(topics = {AbstractConsumerSeekAwareTests.TOPIC}, partitions = 3) +public class +AbstractConsumerSeekAwareTests { + + static final String TOPIC = "Seek"; + + @Autowired + Config config; + + @Autowired + KafkaTemplate template; + + @Autowired + MultiGroupListener multiGroupListener; + + @Test + public void seekForAllGroups() throws Exception { + template.send(TOPIC, "test-data"); + template.send(TOPIC, "test-data"); + assertTrue(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)); + assertTrue(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)); + + MultiGroupListener.latch1 = new CountDownLatch(2); + MultiGroupListener.latch2 = new CountDownLatch(2); + + multiGroupListener.seekToBeginning(); + assertTrue(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)); + assertTrue(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)); + } + + @Test + public void seekForSpecificGroup() throws Exception { + template.send(TOPIC, "test-data"); + template.send(TOPIC, "test-data"); + assertTrue(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)); + assertTrue(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)); + + MultiGroupListener.latch1 = new CountDownLatch(2); + MultiGroupListener.latch2 = new CountDownLatch(2); + + multiGroupListener.seekToBeginningForGroup("group2"); + assertThat(MultiGroupListener.latch1.getCount()).isEqualTo(2); + assertTrue(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)); + } + + @EnableKafka + @Configuration + static class Config { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + return factory; + } + + @Bean + ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("test-group", "false", this.broker)); + } + + @Bean + ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(this.broker)); + } + + @Bean + KafkaTemplate template(ProducerFactory pf) { + return new KafkaTemplate<>(pf); + } + + @Component + static class MultiGroupListener extends AbstractConsumerSeekAware { + + static CountDownLatch latch1 = new CountDownLatch(2); + static CountDownLatch latch2 = new CountDownLatch(2); + + @KafkaListener(groupId = "group1", topics = TOPIC) + void listenForGroup1(String in) { +// System.out.printf("[group1] in = %s\n", in); // TODO remove + latch1.countDown(); + } + + @KafkaListener(groupId = "group2", topics = TOPIC) + void listenForGroup2(String in) { +// System.out.printf("[group2] in = %s\n", in); // TODO remove + latch2.countDown(); + } + + public void seekToBeginningForGroup(String groupIdForSeek) { + getCallbacksAndTopics().forEach((cb, topics) -> { + if (groupIdForSeek.equals(cb.getGroupId())) { + topics.forEach(tp -> cb.seekToBeginning(tp.topic(), tp.partition())); + } + }); + } + } + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerSeekAwareTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerSeekAwareTests.java index b215aa0074..d579423f3d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerSeekAwareTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ConsumerSeekAwareTests.java @@ -16,27 +16,28 @@ package org.springframework.kafka.listener; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + import java.util.Collections; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; + import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback; import org.springframework.kafka.test.utils.KafkaTestUtils; -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; - /** * @author Gary Russell - * @author Borahm Lee * @since 2.6 + * */ public class ConsumerSeekAwareTests { @@ -50,7 +51,7 @@ class CSA extends AbstractConsumerSeekAware { var exec1 = Executors.newSingleThreadExecutor(); var exec2 = Executors.newSingleThreadExecutor(); var cb1 = mock(ConsumerSeekCallback.class); - var cb2 = mock(ConsumerSeekCallback.class); + var cb2 = mock(ConsumerSeekCallback.class); var first = new AtomicBoolean(true); var map1 = new LinkedHashMap<>(Map.of(new TopicPartition("foo", 0), 0L, new TopicPartition("foo", 1), 0L)); var map2 = new LinkedHashMap<>(Map.of(new TopicPartition("foo", 2), 0L, new TopicPartition("foo", 3), 0L)); @@ -58,7 +59,8 @@ class CSA extends AbstractConsumerSeekAware { if (first.getAndSet(false)) { csa.registerSeekCallback(cb1); csa.onPartitionsAssigned(map1, null); - } else { + } + else { csa.registerSeekCallback(cb2); csa.onPartitionsAssigned(map2, null); } @@ -78,7 +80,8 @@ class CSA extends AbstractConsumerSeekAware { var revoke1 = (Callable) () -> { if (!first.getAndSet(true)) { csa.onPartitionsRevoked(Collections.singletonList(map1.keySet().iterator().next())); - } else { + } + else { csa.onPartitionsRevoked(Collections.singletonList(map2.keySet().iterator().next())); } return null; @@ -93,7 +96,8 @@ class CSA extends AbstractConsumerSeekAware { var revoke2 = (Callable) () -> { if (first.getAndSet(false)) { csa.onPartitionsRevoked(Collections.singletonList(map1.keySet().iterator().next())); - } else { + } + else { csa.onPartitionsRevoked(Collections.singletonList(map2.keySet().iterator().next())); } return null; @@ -114,28 +118,4 @@ class CSA extends AbstractConsumerSeekAware { exec2.shutdown(); } - @SuppressWarnings("unchecked") - @Test - void notMatchedGroupId() throws ExecutionException, InterruptedException { - class CSA extends AbstractConsumerSeekAware { - @Override - public boolean matchGroupId() { - return false; - } - } - - AbstractConsumerSeekAware csa = new CSA(); - var exec = Executors.newSingleThreadExecutor(); - var register = (Callable) () -> { - csa.registerSeekCallback(mock(ConsumerSeekCallback.class)); - csa.onPartitionsAssigned(Map.of(new TopicPartition("baz", 0), 0L), null); - return null; - }; - exec.submit(register).get(); - assertThat(KafkaTestUtils.getPropertyValue(csa, "callbackForThread", Map.class)).isEmpty(); - assertThat(KafkaTestUtils.getPropertyValue(csa, "callbacks", Map.class)).isEmpty(); - assertThat(KafkaTestUtils.getPropertyValue(csa, "callbacksToTopic", Map.class)).isEmpty(); - exec.shutdown(); - } - } From 991bc8f6df7157db9d6858aa135f0c32b04bfa2b Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Sat, 22 Jun 2024 23:44:47 +0900 Subject: [PATCH 03/14] remove line for class --- .../kafka/listener/AbstractConsumerSeekAwareTests.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java index 77cdea8027..e9f8b239d7 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java @@ -28,8 +28,7 @@ @DirtiesContext @SpringJUnitConfig @EmbeddedKafka(topics = {AbstractConsumerSeekAwareTests.TOPIC}, partitions = 3) -public class -AbstractConsumerSeekAwareTests { +public class AbstractConsumerSeekAwareTests { static final String TOPIC = "Seek"; From a724510a8d86588d73863ac7ebe3867c30a60af7 Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Tue, 25 Jun 2024 00:45:15 +0900 Subject: [PATCH 04/14] Resolve checkstyle violations and add missing copyright section, author tag etc. --- .../kafka/listener/ConsumerSeekAware.java | 4 +- .../AbstractConsumerSeekAwareTests.java | 44 ++++++++++++++----- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java index 868d0128a9..0f8b1cff07 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java @@ -19,7 +19,9 @@ import java.util.Collection; import java.util.Map; import java.util.function.Function; + import org.apache.kafka.common.TopicPartition; + import org.springframework.lang.Nullable; /** @@ -224,8 +226,8 @@ default void seekToEnd(Collection partitions) { * @param topic the topic. * @param partition the partition. * @param timestamp the time stamp. - * @see #seekToTimestamp(Collection, long) * @since 2.3 + * @see #seekToTimestamp(Collection, long) */ void seekToTimestamp(String topic, int partition, long timestamp); diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java index e9f8b239d7..167b3b4802 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java @@ -1,8 +1,28 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.springframework.kafka.listener; +import static org.assertj.core.api.Assertions.assertThat; + import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + import org.junit.jupiter.api.Test; + import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -22,9 +42,10 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; - +/** + * @author Borahm Lee + * @since 3.3 + */ @DirtiesContext @SpringJUnitConfig @EmbeddedKafka(topics = {AbstractConsumerSeekAwareTests.TOPIC}, partitions = 3) @@ -45,30 +66,30 @@ public class AbstractConsumerSeekAwareTests { public void seekForAllGroups() throws Exception { template.send(TOPIC, "test-data"); template.send(TOPIC, "test-data"); - assertTrue(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)); - assertTrue(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)); + assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); MultiGroupListener.latch1 = new CountDownLatch(2); MultiGroupListener.latch2 = new CountDownLatch(2); multiGroupListener.seekToBeginning(); - assertTrue(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)); - assertTrue(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)); + assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); } @Test public void seekForSpecificGroup() throws Exception { template.send(TOPIC, "test-data"); template.send(TOPIC, "test-data"); - assertTrue(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)); - assertTrue(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)); + assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); MultiGroupListener.latch1 = new CountDownLatch(2); MultiGroupListener.latch2 = new CountDownLatch(2); multiGroupListener.seekToBeginningForGroup("group2"); assertThat(MultiGroupListener.latch1.getCount()).isEqualTo(2); - assertTrue(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)); + assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); } @EnableKafka @@ -105,17 +126,16 @@ KafkaTemplate template(ProducerFactory pf) { static class MultiGroupListener extends AbstractConsumerSeekAware { static CountDownLatch latch1 = new CountDownLatch(2); + static CountDownLatch latch2 = new CountDownLatch(2); @KafkaListener(groupId = "group1", topics = TOPIC) void listenForGroup1(String in) { -// System.out.printf("[group1] in = %s\n", in); // TODO remove latch1.countDown(); } @KafkaListener(groupId = "group2", topics = TOPIC) void listenForGroup2(String in) { -// System.out.printf("[group2] in = %s\n", in); // TODO remove latch2.countDown(); } From d7ce6044f3962d23507850e64bbb7f0cc6219d86 Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Tue, 25 Jun 2024 01:11:09 +0900 Subject: [PATCH 05/14] Drop `public` modifiers --- .../kafka/listener/AbstractConsumerSeekAwareTests.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java index 167b3b4802..3d613afc57 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java @@ -49,7 +49,7 @@ @DirtiesContext @SpringJUnitConfig @EmbeddedKafka(topics = {AbstractConsumerSeekAwareTests.TOPIC}, partitions = 3) -public class AbstractConsumerSeekAwareTests { +class AbstractConsumerSeekAwareTests { static final String TOPIC = "Seek"; @@ -63,7 +63,7 @@ public class AbstractConsumerSeekAwareTests { MultiGroupListener multiGroupListener; @Test - public void seekForAllGroups() throws Exception { + void seekForAllGroups() throws Exception { template.send(TOPIC, "test-data"); template.send(TOPIC, "test-data"); assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); @@ -78,7 +78,7 @@ public void seekForAllGroups() throws Exception { } @Test - public void seekForSpecificGroup() throws Exception { + void seekForSpecificGroup() throws Exception { template.send(TOPIC, "test-data"); template.send(TOPIC, "test-data"); assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); @@ -100,7 +100,7 @@ static class Config { EmbeddedKafkaBroker broker; @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( ConsumerFactory consumerFactory) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); @@ -139,7 +139,7 @@ void listenForGroup2(String in) { latch2.countDown(); } - public void seekToBeginningForGroup(String groupIdForSeek) { + void seekToBeginningForGroup(String groupIdForSeek) { getCallbacksAndTopics().forEach((cb, topics) -> { if (groupIdForSeek.equals(cb.getGroupId())) { topics.forEach(tp -> cb.seekToBeginning(tp.topic(), tp.partition())); From 2f3cdb7ed72fb03ae86abea62a6f4ef885141478 Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Tue, 25 Jun 2024 02:10:11 +0900 Subject: [PATCH 06/14] Update "seek.adoc" and "whats-new.adoc" --- .../main/antora/modules/ROOT/pages/kafka/seek.adoc | 11 ++++++++++- .../src/main/antora/modules/ROOT/pages/whats-new.adoc | 11 ++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc index 42653c51d6..405f406318 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc @@ -9,7 +9,7 @@ void registerSeekCallback(ConsumerSeekCallback callback); void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback); -void onPartitionsRevoked(Collection partitions) +void onPartitionsRevoked(Collection partitions); void onIdleContainer(Map assignments, ConsumerSeekCallback callback); ---- @@ -49,6 +49,8 @@ void seekRelative(String topic, int partition, long offset, boolean toCurrent); void seekToTimestamp(String topic, int partition, long timestamp); void seekToTimestamp(Collection topicPartitions, long timestamp); + +String getGroupId(); ---- The two different variants of the `seek` methods provide a way to seek to an arbitrary offset. @@ -232,4 +234,11 @@ public class SomeOtherBean { ---- +As of version 3.3, a new method `getGroupId()` was introduced in the `ConsumerSeekAware.ConsumerSeekCallback` interface. +This method is particularly useful when you need to identify the consumer group associated with a specific seek callback. + +NOTE: When using a class that extends `AbstractConsumerSeekAware`, a seek operation performed in one listener may impact all listeners. +This might not always be the desired behavior. +To address this, you can use the `getGroupId()` method provided by the callback. +This allows you to perform seek operations selectively, targeting only the consumer group of interest. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index be14dcdbde..9b990000e9 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -7,13 +7,18 @@ This section covers the changes made from version 3.2 to version 3.3. For changes in earlier version, see xref:appendix/change-history.adoc[Change History]. - [[x33-dlt-topic-naming]] === DLT Topic Naming Convention -The naming convention for DLT topics has been standardized to use the "-dlt" suffix consistently. This change ensures compatibility and avoids conflicts when transitioning between different retry solutions. Users who wish to retain the ".DLT" suffix behavior need to opt-in explicitly by setting the appropriate DLT name property. - +The naming convention for DLT topics has been standardized to use the "-dlt" suffix consistently. +This change ensures compatibility and avoids conflicts when transitioning between different retry solutions. +Users who wish to retain the ".DLT" suffix behavior need to opt-in explicitly by setting the appropriate DLT name property. +[[x33-seek-with-group-id]] +=== Enhanced Seek Operations for Consumer Groups +A new method, `getGroupId()`, has been added to the `ConsumerSeekCallback` interface. +This method allows for more selective seek operations by targeting only the desired consumer group. +For more details, see xref:kafka/seek.adoc#seek[Seek API Docs]. From f94938c7ed94bca02e846ef564afc97bae957cbc Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Tue, 25 Jun 2024 02:15:18 +0900 Subject: [PATCH 07/14] Restore dlt style in "whats-new.adoc" --- .../src/main/antora/modules/ROOT/pages/whats-new.adoc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 9b990000e9..345f1e99b7 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -10,9 +10,7 @@ For changes in earlier version, see xref:appendix/change-history.adoc[Change His [[x33-dlt-topic-naming]] === DLT Topic Naming Convention -The naming convention for DLT topics has been standardized to use the "-dlt" suffix consistently. -This change ensures compatibility and avoids conflicts when transitioning between different retry solutions. -Users who wish to retain the ".DLT" suffix behavior need to opt-in explicitly by setting the appropriate DLT name property. +The naming convention for DLT topics has been standardized to use the "-dlt" suffix consistently. This change ensures compatibility and avoids conflicts when transitioning between different retry solutions. Users who wish to retain the ".DLT" suffix behavior need to opt-in explicitly by setting the appropriate DLT name property. [[x33-seek-with-group-id]] === Enhanced Seek Operations for Consumer Groups From 1ef1ccd4cc6cf83925688e2e05a77834323df23a Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Tue, 25 Jun 2024 02:20:01 +0900 Subject: [PATCH 08/14] Update "whats-new.adoc" --- .../main/antora/modules/ROOT/pages/kafka/seek.adoc | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc index 405f406318..19b7c5d359 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc @@ -3,7 +3,7 @@ In order to seek, your listener must implement `ConsumerSeekAware`, which has the following methods: -[source, java] +[source,java] ---- void registerSeekCallback(ConsumerSeekCallback callback); @@ -30,7 +30,7 @@ You should discard this thread's callback and remove any associations to the rev The callback has the following methods: -[source, java] +[source,java] ---- void seek(String topic, int partition, long offset); @@ -75,7 +75,7 @@ See xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Con NOTE: The `seekToBeginning` method that accepts a collection is useful, for example, when processing a compacted topic and you wish to seek to the beginning every time the application is started: -[source, java] +[source,java] ---- public class MyListener implements ConsumerSeekAware { @@ -93,7 +93,7 @@ To arbitrarily seek at runtime, use the callback reference from the `registerSee Here is a trivial Spring Boot application that demonstrates how to use the callback; it sends 10 records to the topic; hitting `` in the console causes all partitions to seek to the beginning. -[source, java] +[source,java] ---- @SpringBootApplication public class SeekExampleApplication { @@ -166,7 +166,7 @@ To make things simpler, version 2.3 added the `AbstractConsumerSeekAware` class, The following example shows how to seek to the last record processed, in each partition, each time the container goes idle. It also has methods that allow arbitrary external calls to rewind partitions by one record. -[source, java] +[source,java] ---- public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware { @@ -210,7 +210,7 @@ Version 2.6 added convenience methods to the abstract class: Example: -[source, java] +[source,java] ---- public class MyListener extends AbstractConsumerSeekAware { @@ -237,7 +237,7 @@ public class SomeOtherBean { As of version 3.3, a new method `getGroupId()` was introduced in the `ConsumerSeekAware.ConsumerSeekCallback` interface. This method is particularly useful when you need to identify the consumer group associated with a specific seek callback. -NOTE: When using a class that extends `AbstractConsumerSeekAware`, a seek operation performed in one listener may impact all listeners. +NOTE: When using a class that extends `AbstractConsumerSeekAware`, a seek operation performed in one listener may impact all listeners in the same class. This might not always be the desired behavior. To address this, you can use the `getGroupId()` method provided by the callback. This allows you to perform seek operations selectively, targeting only the consumer group of interest. From d79ea1a7a85e4aad83b9c3cffd9285f3993a28cb Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Tue, 25 Jun 2024 02:32:11 +0900 Subject: [PATCH 09/14] Update "seek.adoc" --- .../main/antora/modules/ROOT/pages/kafka/seek.adoc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc index 19b7c5d359..8b97c94cb4 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc @@ -3,7 +3,7 @@ In order to seek, your listener must implement `ConsumerSeekAware`, which has the following methods: -[source,java] +[source, java] ---- void registerSeekCallback(ConsumerSeekCallback callback); @@ -30,7 +30,7 @@ You should discard this thread's callback and remove any associations to the rev The callback has the following methods: -[source,java] +[source, java] ---- void seek(String topic, int partition, long offset); @@ -75,7 +75,7 @@ See xref:kafka/events.adoc#idle-containers[Detecting Idle and Non-Responsive Con NOTE: The `seekToBeginning` method that accepts a collection is useful, for example, when processing a compacted topic and you wish to seek to the beginning every time the application is started: -[source,java] +[source, java] ---- public class MyListener implements ConsumerSeekAware { @@ -93,7 +93,7 @@ To arbitrarily seek at runtime, use the callback reference from the `registerSee Here is a trivial Spring Boot application that demonstrates how to use the callback; it sends 10 records to the topic; hitting `` in the console causes all partitions to seek to the beginning. -[source,java] +[source, java] ---- @SpringBootApplication public class SeekExampleApplication { @@ -166,7 +166,7 @@ To make things simpler, version 2.3 added the `AbstractConsumerSeekAware` class, The following example shows how to seek to the last record processed, in each partition, each time the container goes idle. It also has methods that allow arbitrary external calls to rewind partitions by one record. -[source,java] +[source, java] ---- public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware { @@ -210,7 +210,7 @@ Version 2.6 added convenience methods to the abstract class: Example: -[source,java] +[source, java] ---- public class MyListener extends AbstractConsumerSeekAware { From 4248be53fcd203969ac54e5ab46646a1781c8c39 Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Tue, 25 Jun 2024 09:16:11 +0900 Subject: [PATCH 10/14] Revert the existing javadocs --- .../kafka/listener/ConsumerSeekAware.java | 55 ++++++++----------- 1 file changed, 24 insertions(+), 31 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java index 0f8b1cff07..fec4f79a62 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java @@ -41,7 +41,6 @@ public interface ConsumerSeekAware { * {@code ConcurrentMessageListenerContainer} or the same listener instance in multiple * containers listeners should store the callback in a {@code ThreadLocal} or a map keyed * by the thread. - * * @param callback the callback. */ default void registerSeekCallback(ConsumerSeekCallback callback) { @@ -49,9 +48,8 @@ default void registerSeekCallback(ConsumerSeekCallback callback) { /** * When using group management, called when partition assignments change. - * * @param assignments the new assignments and their current offsets. - * @param callback the callback to perform an initial seek after assignment. + * @param callback the callback to perform an initial seek after assignment. */ default void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback) { } @@ -60,7 +58,6 @@ default void onPartitionsAssigned(Map assignments, Consume * When using group management, called when partition assignments are revoked. * Listeners should discard any callback saved from * {@link #registerSeekCallback(ConsumerSeekCallback)} on this thread. - * * @param partitions the partitions that have been revoked. * @since 2.3 */ @@ -70,9 +67,8 @@ default void onPartitionsRevoked(Collection partitions) { /** * If the container is configured to emit idle container events, this method is called * when the container idle event is emitted - allowing a seek operation. - * * @param assignments the new assignments and their current offsets. - * @param callback the callback to perform a seek. + * @param callback the callback to perform a seek. */ default void onIdleContainer(Map assignments, ConsumerSeekCallback callback) { } @@ -81,7 +77,6 @@ default void onIdleContainer(Map assignments, ConsumerSeek * When using manual partition assignment, called when the first poll has completed; * useful when using {@code auto.offset.reset=latest} and you need to wait until the * initial position has been established. - * * @since 2.8.8 */ default void onFirstPoll() { @@ -90,12 +85,21 @@ default void onFirstPoll() { /** * Called when the listener consumer terminates allowing implementations to clean up * state, such as thread locals. - * * @since 2.4 */ default void unregisterSeekCallback() { } + /** + * Determine if the consumer group ID for seeking matches the expected value. + * + * @return true if the group ID matches, false otherwise. + * @since 3.3 + */ + default boolean matchGroupId() { + return false; + } + /** * A callback that a listener can invoke to seek to a specific offset. */ @@ -109,10 +113,9 @@ interface ConsumerSeekCallback { * queue the seek operation to the consumer. The queued seek will occur after any * pending offset commits. The consumer must be currently assigned the specified * partition. - * - * @param topic the topic. + * @param topic the topic. * @param partition the partition. - * @param offset the offset (absolute). + * @param offset the offset (absolute). */ void seek(String topic, int partition, long offset); @@ -126,9 +129,8 @@ interface ConsumerSeekCallback { * queue the seek operation to the consumer. The queued seek will occur after any * pending offset commits. The consumer must be currently assigned the specified * partition. - * - * @param topic the topic. - * @param partition the partition. + * @param topic the topic. + * @param partition the partition. * @param offsetComputeFunction function to compute the absolute offset to seek to. * @since 3.2.0 */ @@ -142,8 +144,7 @@ interface ConsumerSeekCallback { * the seek operation to the consumer. The queued seek will occur after * any pending offset commits. The consumer must be currently assigned the * specified partition. - * - * @param topic the topic. + * @param topic the topic. * @param partition the partition. */ void seekToBeginning(String topic, int partition); @@ -156,7 +157,6 @@ interface ConsumerSeekCallback { * queue the seek operation to the consumer for each * {@link TopicPartition}. The seek will occur after any pending offset commits. * The consumer must be currently assigned the specified partition(s). - * * @param partitions the {@link TopicPartition}s. * @since 2.3.4 */ @@ -172,8 +172,7 @@ default void seekToBeginning(Collection partitions) { * the seek operation to the consumer. The queued seek will occur after any * pending offset commits. The consumer must be currently assigned the specified * partition. - * - * @param topic the topic. + * @param topic the topic. * @param partition the partition. */ void seekToEnd(String topic, int partition); @@ -186,7 +185,6 @@ default void seekToBeginning(Collection partitions) { * the seek operation to the consumer for each {@link TopicPartition}. The queued * seek(s) will occur after any pending offset commits. The consumer must be * currently assigned the specified partition(s). - * * @param partitions the {@link TopicPartition}s. * @since 2.3.4 */ @@ -201,13 +199,12 @@ default void seekToEnd(Collection partitions) { * perform the seek immediately on the consumer. When called from elsewhere, queue * the seek operation. The queued seek will occur after any pending offset * commits. The consumer must be currently assigned the specified partition. - * - * @param topic the topic. + * @param topic the topic. * @param partition the partition. - * @param offset the offset; positive values are relative to the start, negative - * values are relative to the end, unless toCurrent is true. + * @param offset the offset; positive values are relative to the start, negative + * values are relative to the end, unless toCurrent is true. * @param toCurrent true for the offset to be relative to the current position - * rather than the beginning or end. + * rather than the beginning or end. * @since 2.3 */ void seekRelative(String topic, int partition, long offset, boolean toCurrent); @@ -222,8 +219,7 @@ default void seekToEnd(Collection partitions) { * commits. The consumer must be currently assigned the specified partition. Use * {@link #seekToTimestamp(Collection, long)} when seeking multiple partitions * because the offset lookup is blocking. - * - * @param topic the topic. + * @param topic the topic. * @param partition the partition. * @param timestamp the time stamp. * @since 2.3 @@ -239,19 +235,16 @@ default void seekToEnd(Collection partitions) { * perform the seek immediately on the consumer. When called from elsewhere, queue * the seek operation. The queued seek will occur after any pending offset * commits. The consumer must be currently assigned the specified partition. - * * @param topicPartitions the topic/partitions. - * @param timestamp the time stamp. + * @param timestamp the time stamp. * @since 2.3 */ void seekToTimestamp(Collection topicPartitions, long timestamp); - /** * Retrieve the group ID associated with this consumer seek callback, if available. * This method returns {@code null} by default, indicating that the group ID is not specified. * Implementations may override this method to provide a specific group ID value. - * * @return the consumer group ID. * @since 3.3 */ From 74a533aac31d4e11d614164d656fd2bde7ff6063 Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Tue, 25 Jun 2024 09:18:59 +0900 Subject: [PATCH 11/14] Use consumerGroupId field instead --- .../kafka/listener/KafkaMessageListenerContainer.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index f1f84940be..3755c62620 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -1907,7 +1907,7 @@ private void wrapUp(@Nullable Throwable throwable) { this.consumerSeekAwareListener.onPartitionsRevoked(partitions); this.consumerSeekAwareListener.unregisterSeekCallback(); } - this.logger.info(() -> KafkaMessageListenerContainer.this.getGroupId() + ": Consumer stopped"); + this.logger.info(() -> this.consumerGroupId + ": Consumer stopped"); publishConsumerStoppedEvent(throwable); } @@ -2694,7 +2694,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation( this.containerProperties.getObservationConvention(), DefaultKafkaListenerObservationConvention.INSTANCE, - () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), KafkaMessageListenerContainer.this.getGroupId(), + () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), this.consumerGroupId, this::clusterId), this.observationRegistry); return observation.observe(() -> { @@ -3330,7 +3330,7 @@ public void seekToTimestamp(Collection topicParts, long timestam @Override public String getGroupId() { - return KafkaMessageListenerContainer.this.getGroupId(); + return this.consumerGroupId; } @Override From f1e15cea280fb9499f79a4892157917673ce685d Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Tue, 25 Jun 2024 09:51:03 +0900 Subject: [PATCH 12/14] Remove useless method --- .../kafka/listener/ConsumerSeekAware.java | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java index fec4f79a62..fdac0a661f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java @@ -90,16 +90,6 @@ default void onFirstPoll() { default void unregisterSeekCallback() { } - /** - * Determine if the consumer group ID for seeking matches the expected value. - * - * @return true if the group ID matches, false otherwise. - * @since 3.3 - */ - default boolean matchGroupId() { - return false; - } - /** * A callback that a listener can invoke to seek to a specific offset. */ From 0b1aae414451f3cfb6144eed40ab18f4619beeb2 Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Tue, 25 Jun 2024 11:32:10 +0900 Subject: [PATCH 13/14] Modify test --- .../kafka/listener/AbstractConsumerSeekAwareTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java index 3d613afc57..3305424052 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java @@ -88,8 +88,8 @@ void seekForSpecificGroup() throws Exception { MultiGroupListener.latch2 = new CountDownLatch(2); multiGroupListener.seekToBeginningForGroup("group2"); - assertThat(MultiGroupListener.latch1.getCount()).isEqualTo(2); assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch1.await(100, TimeUnit.MICROSECONDS)).isFalse(); } @EnableKafka From b34a610c9262ab192922c84f1e0435553b01ea58 Mon Sep 17 00:00:00 2001 From: Borahm Lee Date: Tue, 25 Jun 2024 11:46:04 +0900 Subject: [PATCH 14/14] Add assert --- .../kafka/listener/AbstractConsumerSeekAwareTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java index 3305424052..169987e1a8 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java @@ -90,6 +90,7 @@ void seekForSpecificGroup() throws Exception { multiGroupListener.seekToBeginningForGroup("group2"); assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(MultiGroupListener.latch1.await(100, TimeUnit.MICROSECONDS)).isFalse(); + assertThat(MultiGroupListener.latch1.getCount()).isEqualTo(2); } @EnableKafka