Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onPartitionsRevoked(Collection<TopicPartition> partitions);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
----
Expand Down Expand Up @@ -49,6 +49,8 @@ void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);

void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

String getGroupId();
----

The two different variants of the `seek` methods provide a way to seek to an arbitrary offset.
Expand Down Expand Up @@ -232,4 +234,11 @@ public class SomeOtherBean {

----

As of version 3.3, a new method `getGroupId()` was introduced in the `ConsumerSeekAware.ConsumerSeekCallback` interface.
This method is particularly useful when you need to identify the consumer group associated with a specific seek callback.

NOTE: When using a class that extends `AbstractConsumerSeekAware`, a seek operation performed in one listener may impact all listeners in the same class.
This might not always be the desired behavior.
To address this, you can use the `getGroupId()` method provided by the callback.
This allows you to perform seek operations selectively, targeting only the consumer group of interest.

Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
This section covers the changes made from version 3.2 to version 3.3.
For changes in earlier version, see xref:appendix/change-history.adoc[Change History].


[[x33-dlt-topic-naming]]
=== DLT Topic Naming Convention

The naming convention for DLT topics has been standardized to use the "-dlt" suffix consistently. This change ensures compatibility and avoids conflicts when transitioning between different retry solutions. Users who wish to retain the ".DLT" suffix behavior need to opt-in explicitly by setting the appropriate DLT name property.

[[x33-seek-with-group-id]]
=== Enhanced Seek Operations for Consumer Groups


A new method, `getGroupId()`, has been added to the `ConsumerSeekCallback` interface.
This method allows for more selective seek operations by targeting only the desired consumer group.
For more details, see xref:kafka/seek.adoc#seek[Seek API Docs].


Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@

import org.apache.kafka.common.TopicPartition;

import org.springframework.lang.Nullable;

/**
* Listeners that implement this interface are provided with a
* {@link ConsumerSeekCallback} which can be used to perform a
* seek operation.
*
* @author Gary Russell
* @author Soby Chacko
* @author Borahm Lee
* @since 1.1
*
*/
public interface ConsumerSeekAware {

Expand All @@ -39,15 +41,17 @@ public interface ConsumerSeekAware {
* {@code ConcurrentMessageListenerContainer} or the same listener instance in multiple
* containers listeners should store the callback in a {@code ThreadLocal} or a map keyed
* by the thread.
*
* @param callback the callback.
*/
default void registerSeekCallback(ConsumerSeekCallback callback) {
}

/**
* When using group management, called when partition assignments change.
*
* @param assignments the new assignments and their current offsets.
* @param callback the callback to perform an initial seek after assignment.
* @param callback the callback to perform an initial seek after assignment.
*/
default void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
Expand All @@ -56,6 +60,7 @@ default void onPartitionsAssigned(Map<TopicPartition, Long> assignments, Consume
* When using group management, called when partition assignments are revoked.
* Listeners should discard any callback saved from
* {@link #registerSeekCallback(ConsumerSeekCallback)} on this thread.
*
* @param partitions the partitions that have been revoked.
* @since 2.3
*/
Expand All @@ -65,8 +70,9 @@ default void onPartitionsRevoked(Collection<TopicPartition> partitions) {
/**
* If the container is configured to emit idle container events, this method is called
* when the container idle event is emitted - allowing a seek operation.
*
* @param assignments the new assignments and their current offsets.
* @param callback the callback to perform a seek.
* @param callback the callback to perform a seek.
*/
default void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
Expand All @@ -75,6 +81,7 @@ default void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeek
* When using manual partition assignment, called when the first poll has completed;
* useful when using {@code auto.offset.reset=latest} and you need to wait until the
* initial position has been established.
*
* @since 2.8.8
*/
default void onFirstPoll() {
Expand All @@ -83,6 +90,7 @@ default void onFirstPoll() {
/**
* Called when the listener consumer terminates allowing implementations to clean up
* state, such as thread locals.
*
* @since 2.4
*/
default void unregisterSeekCallback() {
Expand All @@ -101,9 +109,10 @@ interface ConsumerSeekCallback {
* queue the seek operation to the consumer. The queued seek will occur after any
* pending offset commits. The consumer must be currently assigned the specified
* partition.
* @param topic the topic.
*
* @param topic the topic.
* @param partition the partition.
* @param offset the offset (absolute).
* @param offset the offset (absolute).
*/
void seek(String topic, int partition, long offset);

Expand All @@ -117,8 +126,9 @@ interface ConsumerSeekCallback {
* queue the seek operation to the consumer. The queued seek will occur after any
* pending offset commits. The consumer must be currently assigned the specified
* partition.
* @param topic the topic.
* @param partition the partition.
*
* @param topic the topic.
* @param partition the partition.
* @param offsetComputeFunction function to compute the absolute offset to seek to.
* @since 3.2.0
*/
Expand All @@ -132,7 +142,8 @@ interface ConsumerSeekCallback {
* the seek operation to the consumer. The queued seek will occur after
* any pending offset commits. The consumer must be currently assigned the
* specified partition.
* @param topic the topic.
*
* @param topic the topic.
* @param partition the partition.
*/
void seekToBeginning(String topic, int partition);
Expand All @@ -145,6 +156,7 @@ interface ConsumerSeekCallback {
* queue the seek operation to the consumer for each
* {@link TopicPartition}. The seek will occur after any pending offset commits.
* The consumer must be currently assigned the specified partition(s).
*
* @param partitions the {@link TopicPartition}s.
* @since 2.3.4
*/
Expand All @@ -160,7 +172,8 @@ default void seekToBeginning(Collection<TopicPartition> partitions) {
* the seek operation to the consumer. The queued seek will occur after any
* pending offset commits. The consumer must be currently assigned the specified
* partition.
* @param topic the topic.
*
* @param topic the topic.
* @param partition the partition.
*/
void seekToEnd(String topic, int partition);
Expand All @@ -173,6 +186,7 @@ default void seekToBeginning(Collection<TopicPartition> partitions) {
* the seek operation to the consumer for each {@link TopicPartition}. The queued
* seek(s) will occur after any pending offset commits. The consumer must be
* currently assigned the specified partition(s).
*
* @param partitions the {@link TopicPartition}s.
* @since 2.3.4
*/
Expand All @@ -187,12 +201,13 @@ default void seekToEnd(Collection<TopicPartition> partitions) {
* perform the seek immediately on the consumer. When called from elsewhere, queue
* the seek operation. The queued seek will occur after any pending offset
* commits. The consumer must be currently assigned the specified partition.
* @param topic the topic.
*
* @param topic the topic.
* @param partition the partition.
* @param offset the offset; positive values are relative to the start, negative
* values are relative to the end, unless toCurrent is true.
* @param offset the offset; positive values are relative to the start, negative
* values are relative to the end, unless toCurrent is true.
* @param toCurrent true for the offset to be relative to the current position
* rather than the beginning or end.
* rather than the beginning or end.
* @since 2.3
*/
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
Expand All @@ -207,7 +222,8 @@ default void seekToEnd(Collection<TopicPartition> partitions) {
* commits. The consumer must be currently assigned the specified partition. Use
* {@link #seekToTimestamp(Collection, long)} when seeking multiple partitions
* because the offset lookup is blocking.
* @param topic the topic.
*
* @param topic the topic.
* @param partition the partition.
* @param timestamp the time stamp.
* @since 2.3
Expand All @@ -223,12 +239,26 @@ default void seekToEnd(Collection<TopicPartition> partitions) {
* perform the seek immediately on the consumer. When called from elsewhere, queue
* the seek operation. The queued seek will occur after any pending offset
* commits. The consumer must be currently assigned the specified partition.
*
* @param topicPartitions the topic/partitions.
* @param timestamp the time stamp.
* @param timestamp the time stamp.
* @since 2.3
*/
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);


/**
* Retrieve the group ID associated with this consumer seek callback, if available.
* This method returns {@code null} by default, indicating that the group ID is not specified.
* Implementations may override this method to provide a specific group ID value.
*
* @return the consumer group ID.
* @since 3.3
*/
@Nullable
default String getGroupId() {
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
* @author Raphael Rösch
* @author Christian Mergenthaler
* @author Mikael Carlstedt
* @author Borahm Lee
*/
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
Expand Down Expand Up @@ -681,7 +682,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume

private final TransactionTemplate transactionTemplate;

private final String consumerGroupId = getGroupId();
private final String consumerGroupId = KafkaMessageListenerContainer.this.getGroupId();

private final TaskScheduler taskScheduler;

Expand Down Expand Up @@ -1362,8 +1363,8 @@ protected void initialize() {
}
publishConsumerStartingEvent();
this.consumerThread = Thread.currentThread();
setupSeeks();
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
setupSeeks();
this.count = 0;
this.last = System.currentTimeMillis();
initAssignedPartitions();
Expand Down Expand Up @@ -1906,7 +1907,7 @@ private void wrapUp(@Nullable Throwable throwable) {
this.consumerSeekAwareListener.onPartitionsRevoked(partitions);
this.consumerSeekAwareListener.unregisterSeekCallback();
}
this.logger.info(() -> getGroupId() + ": Consumer stopped");
this.logger.info(() -> KafkaMessageListenerContainer.this.getGroupId() + ": Consumer stopped");
publishConsumerStoppedEvent(throwable);
}

Expand Down Expand Up @@ -2693,7 +2694,7 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> cReco
Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
this.containerProperties.getObservationConvention(),
DefaultKafkaListenerObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), getGroupId(),
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), getClientId(), KafkaMessageListenerContainer.this.getGroupId(),
this::clusterId),
this.observationRegistry);
return observation.observe(() -> {
Expand Down Expand Up @@ -3327,6 +3328,11 @@ public void seekToTimestamp(Collection<TopicPartition> topicParts, long timestam
topicParts.forEach(tp -> seekToTimestamp(tp.topic(), tp.partition(), timestamp));
}

@Override
public String getGroupId() {
return KafkaMessageListenerContainer.this.getGroupId();
}

@Override
public String toString() {
return "KafkaMessageListenerContainer.ListenerConsumer ["
Expand Down
Loading