diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc index 42653c51d6..8b97c94cb4 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/seek.adoc @@ -9,7 +9,7 @@ void registerSeekCallback(ConsumerSeekCallback callback); void onPartitionsAssigned(Map assignments, ConsumerSeekCallback callback); -void onPartitionsRevoked(Collection partitions) +void onPartitionsRevoked(Collection partitions); void onIdleContainer(Map assignments, ConsumerSeekCallback callback); ---- @@ -49,6 +49,8 @@ void seekRelative(String topic, int partition, long offset, boolean toCurrent); void seekToTimestamp(String topic, int partition, long timestamp); void seekToTimestamp(Collection topicPartitions, long timestamp); + +String getGroupId(); ---- The two different variants of the `seek` methods provide a way to seek to an arbitrary offset. @@ -232,4 +234,11 @@ public class SomeOtherBean { ---- +As of version 3.3, a new method `getGroupId()` was introduced in the `ConsumerSeekAware.ConsumerSeekCallback` interface. +This method is particularly useful when you need to identify the consumer group associated with a specific seek callback. + +NOTE: When using a class that extends `AbstractConsumerSeekAware`, a seek operation performed in one listener may impact all listeners in the same class. +This might not always be the desired behavior. +To address this, you can use the `getGroupId()` method provided by the callback. +This allows you to perform seek operations selectively, targeting only the consumer group of interest. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index be14dcdbde..345f1e99b7 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -7,13 +7,16 @@ This section covers the changes made from version 3.2 to version 3.3. For changes in earlier version, see xref:appendix/change-history.adoc[Change History]. - [[x33-dlt-topic-naming]] === DLT Topic Naming Convention The naming convention for DLT topics has been standardized to use the "-dlt" suffix consistently. This change ensures compatibility and avoids conflicts when transitioning between different retry solutions. Users who wish to retain the ".DLT" suffix behavior need to opt-in explicitly by setting the appropriate DLT name property. +[[x33-seek-with-group-id]] +=== Enhanced Seek Operations for Consumer Groups - +A new method, `getGroupId()`, has been added to the `ConsumerSeekCallback` interface. +This method allows for more selective seek operations by targeting only the desired consumer group. +For more details, see xref:kafka/seek.adoc#seek[Seek API Docs]. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java index ae4c33cc6e..fdac0a661f 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerSeekAware.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.TopicPartition; +import org.springframework.lang.Nullable; + /** * Listeners that implement this interface are provided with a * {@link ConsumerSeekCallback} which can be used to perform a @@ -29,8 +31,8 @@ * * @author Gary Russell * @author Soby Chacko + * @author Borahm Lee * @since 1.1 - * */ public interface ConsumerSeekAware { @@ -229,6 +231,17 @@ default void seekToEnd(Collection partitions) { */ void seekToTimestamp(Collection topicPartitions, long timestamp); + /** + * Retrieve the group ID associated with this consumer seek callback, if available. + * This method returns {@code null} by default, indicating that the group ID is not specified. + * Implementations may override this method to provide a specific group ID value. + * @return the consumer group ID. + * @since 3.3 + */ + @Nullable + default String getGroupId() { + return null; + } } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index 17653683a0..3755c62620 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -165,6 +165,7 @@ * @author Raphael Rösch * @author Christian Mergenthaler * @author Mikael Carlstedt + * @author Borahm Lee */ public class KafkaMessageListenerContainer // NOSONAR line count extends AbstractMessageListenerContainer implements ConsumerPauseResumeEventPublisher { @@ -681,7 +682,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume private final TransactionTemplate transactionTemplate; - private final String consumerGroupId = getGroupId(); + private final String consumerGroupId = KafkaMessageListenerContainer.this.getGroupId(); private final TaskScheduler taskScheduler; @@ -1362,8 +1363,8 @@ protected void initialize() { } publishConsumerStartingEvent(); this.consumerThread = Thread.currentThread(); - setupSeeks(); KafkaUtils.setConsumerGroupId(this.consumerGroupId); + setupSeeks(); this.count = 0; this.last = System.currentTimeMillis(); initAssignedPartitions(); @@ -1906,7 +1907,7 @@ private void wrapUp(@Nullable Throwable throwable) { this.consumerSeekAwareListener.onPartitionsRevoked(partitions); this.consumerSeekAwareListener.unregisterSeekCallback(); } - this.logger.info(() -> getGroupId() + ": Consumer stopped"); + this.logger.info(() -> this.consumerGroupId + ": Consumer stopped"); publishConsumerStoppedEvent(throwable); } @@ -2693,7 +2694,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord cReco Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation( this.containerProperties.getObservationConvention(), DefaultKafkaListenerObservationConvention.INSTANCE, - () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), getGroupId(), + () -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), this.consumerGroupId, this::clusterId), this.observationRegistry); return observation.observe(() -> { @@ -3327,6 +3328,11 @@ public void seekToTimestamp(Collection topicParts, long timestam topicParts.forEach(tp -> seekToTimestamp(tp.topic(), tp.partition(), timestamp)); } + @Override + public String getGroupId() { + return this.consumerGroupId; + } + @Override public String toString() { return "KafkaMessageListenerContainer.ListenerConsumer [" diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java new file mode 100644 index 0000000000..169987e1a8 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/AbstractConsumerSeekAwareTests.java @@ -0,0 +1,153 @@ +/* + * Copyright 2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.listener; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.AbstractConsumerSeekAwareTests.Config.MultiGroupListener; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.stereotype.Component; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +/** + * @author Borahm Lee + * @since 3.3 + */ +@DirtiesContext +@SpringJUnitConfig +@EmbeddedKafka(topics = {AbstractConsumerSeekAwareTests.TOPIC}, partitions = 3) +class AbstractConsumerSeekAwareTests { + + static final String TOPIC = "Seek"; + + @Autowired + Config config; + + @Autowired + KafkaTemplate template; + + @Autowired + MultiGroupListener multiGroupListener; + + @Test + void seekForAllGroups() throws Exception { + template.send(TOPIC, "test-data"); + template.send(TOPIC, "test-data"); + assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); + + MultiGroupListener.latch1 = new CountDownLatch(2); + MultiGroupListener.latch2 = new CountDownLatch(2); + + multiGroupListener.seekToBeginning(); + assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @Test + void seekForSpecificGroup() throws Exception { + template.send(TOPIC, "test-data"); + template.send(TOPIC, "test-data"); + assertThat(MultiGroupListener.latch1.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); + + MultiGroupListener.latch1 = new CountDownLatch(2); + MultiGroupListener.latch2 = new CountDownLatch(2); + + multiGroupListener.seekToBeginningForGroup("group2"); + assertThat(MultiGroupListener.latch2.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(MultiGroupListener.latch1.await(100, TimeUnit.MICROSECONDS)).isFalse(); + assertThat(MultiGroupListener.latch1.getCount()).isEqualTo(2); + } + + @EnableKafka + @Configuration + static class Config { + + @Autowired + EmbeddedKafkaBroker broker; + + @Bean + ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + ConsumerFactory consumerFactory) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory); + return factory; + } + + @Bean + ConsumerFactory consumerFactory() { + return new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("test-group", "false", this.broker)); + } + + @Bean + ProducerFactory producerFactory() { + return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(this.broker)); + } + + @Bean + KafkaTemplate template(ProducerFactory pf) { + return new KafkaTemplate<>(pf); + } + + @Component + static class MultiGroupListener extends AbstractConsumerSeekAware { + + static CountDownLatch latch1 = new CountDownLatch(2); + + static CountDownLatch latch2 = new CountDownLatch(2); + + @KafkaListener(groupId = "group1", topics = TOPIC) + void listenForGroup1(String in) { + latch1.countDown(); + } + + @KafkaListener(groupId = "group2", topics = TOPIC) + void listenForGroup2(String in) { + latch2.countDown(); + } + + void seekToBeginningForGroup(String groupIdForSeek) { + getCallbacksAndTopics().forEach((cb, topics) -> { + if (groupIdForSeek.equals(cb.getGroupId())) { + topics.forEach(tp -> cb.seekToBeginning(tp.topic(), tp.partition())); + } + }); + } + } + } + +}