Skip to content

Commit dffbb52

Browse files
authored
GH-3554: Doc for batch listener error handling with DLT
Fixes #3554 Adding documention for how to use DLT's with batch mode listeners. Clarifies exception classification, offset commit behavior, and deserialization error handling patterns. Signed-off-by: Soby Chacko <[email protected]>
1 parent 15b3032 commit dffbb52

File tree

1 file changed

+223
-1
lines changed

1 file changed

+223
-1
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/annotation-error-handling.adoc

Lines changed: 223 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,190 @@ Starting with version 2.9, this is now `true` by default.
300300

301301
Also see xref:kafka/annotation-error-handling.adoc#delivery-header[Delivery Attempts Header].
302302

303+
[[batch-listener-error-handling-dlt]]
304+
== Batch Listener Error Handling with Dead Letter Topics
305+
306+
IMPORTANT: xref:retrytopic.adoc[Non-Blocking Retries] (the `@RetryableTopic` annotation) are NOT supported with batch listeners.
307+
For Dead Letter Topic functionality with batch listeners, use `DefaultErrorHandler` with `DeadLetterPublishingRecoverer`.
308+
309+
[[batch-listener-failed-exception]]
310+
=== Using BatchListenerFailedException
311+
312+
To indicate which specific record in a batch failed, throw a `BatchListenerFailedException`:
313+
314+
[source, java]
315+
----
316+
@KafkaListener(id = "batch-listener", topics = "myTopic", containerFactory = "batchFactory")
317+
public void listen(List<ConsumerRecord<String, Order>> records) {
318+
for (ConsumerRecord<String, Order> record : records) {
319+
try {
320+
process(record.value());
321+
}
322+
catch (Exception e) {
323+
// Identifies the failed record for error handling
324+
throw new BatchListenerFailedException("Failed to process", e, record);
325+
}
326+
}
327+
}
328+
----
329+
330+
For POJO batch listeners where you don't have the `ConsumerRecord`, use the index instead:
331+
332+
[source, java]
333+
----
334+
@KafkaListener(id = "batch-listener", topics = "myTopic", containerFactory = "batchFactory")
335+
public void listen(List<Order> orders) {
336+
for (int i = 0; i < orders.size(); i++) {
337+
try {
338+
process(orders.get(i));
339+
}
340+
catch (Exception e) {
341+
throw new BatchListenerFailedException("Failed to process", e, i);
342+
}
343+
}
344+
}
345+
----
346+
347+
[[batch-listener-dlt-config]]
348+
=== Configuring Dead Letter Topics for Batch Listeners
349+
350+
Configure a `DefaultErrorHandler` with a `DeadLetterPublishingRecoverer` on your batch listener container factory:
351+
352+
[source, java]
353+
----
354+
@Bean
355+
public ConcurrentKafkaListenerContainerFactory<String, Order> batchFactory(
356+
ConsumerFactory<String, Order> consumerFactory,
357+
KafkaTemplate<String, Order> kafkaTemplate) {
358+
359+
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
360+
new ConcurrentKafkaListenerContainerFactory<>();
361+
factory.setConsumerFactory(consumerFactory);
362+
factory.setBatchListener(true);
363+
364+
// Configure Dead Letter Publishing
365+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
366+
(record, ex) -> new TopicPartition(record.topic() + "-dlt", record.partition()));
367+
368+
// Configure retries: 3 attempts with 1 second between each
369+
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer,
370+
new FixedBackOff(1000L, 2L)); // 2 retries = 3 total attempts
371+
372+
factory.setCommonErrorHandler(errorHandler);
373+
return factory;
374+
}
375+
----
376+
377+
[[batch-listener-error-flow]]
378+
=== How Batch Error Handling Works
379+
380+
When a `BatchListenerFailedException` is thrown, the `DefaultErrorHandler`:
381+
382+
1. **Commits offsets** for all records before the failed record
383+
2. **Retries** the failed record (and subsequent records) according to the `BackOff` configuration
384+
3. **Publishes to DLT** when retries are exhausted - only the failed record is sent to the DLT
385+
4. **Commits the failed record's offset** and redelivers remaining records for processing
386+
387+
Example flow with a batch of 6 records where record at index 2 fails:
388+
389+
* First attempt: Records 0, 1 processed successfully; record 2 fails
390+
* Container commits offsets for records 0, 1
391+
* Retry attempt 1: Records 2, 3, 4, 5 are retried
392+
* Retry attempt 2: Records 2, 3, 4, 5 are retried again
393+
* After retries exhausted: Record 2 is published to DLT and its offset is committed
394+
* Container continues with records 3, 4, 5
395+
396+
[[batch-listener-skip-retries]]
397+
=== Skipping Retries for Specific Exceptions
398+
399+
By default, the `DefaultErrorHandler` retries all exceptions except for fatal ones (like `DeserializationException`, `MessageConversionException`, etc.).
400+
To skip retries for your own exception types, configure the error handler with exception classifications.
401+
402+
The error handler examines the **cause** of the `BatchListenerFailedException` to determine if it should skip retries:
403+
404+
[source, java]
405+
----
406+
@Bean
407+
public ConcurrentKafkaListenerContainerFactory<String, Order> batchFactory(
408+
ConsumerFactory<String, Order> consumerFactory,
409+
KafkaTemplate<String, Order> kafkaTemplate) {
410+
411+
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
412+
new ConcurrentKafkaListenerContainerFactory<>();
413+
factory.setConsumerFactory(consumerFactory);
414+
factory.setBatchListener(true);
415+
416+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
417+
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer,
418+
new FixedBackOff(1000L, 2L));
419+
420+
// Add custom exception types that should skip retries and go directly to DLT
421+
errorHandler.addNotRetryableExceptions(ValidationException.class, InvalidFormatException.class);
422+
423+
factory.setCommonErrorHandler(errorHandler);
424+
return factory;
425+
}
426+
----
427+
428+
Now in your listener:
429+
430+
[source, java]
431+
----
432+
@KafkaListener(id = "batch-listener", topics = "orders", containerFactory = "batchFactory")
433+
public void processOrders(List<ConsumerRecord<String, Order>> records) {
434+
for (ConsumerRecord<String, Order> record : records) {
435+
try {
436+
process(record.value());
437+
}
438+
catch (DatabaseException e) {
439+
// Will be retried 3 times (according to BackOff configuration)
440+
throw new BatchListenerFailedException("Database error", e, record);
441+
}
442+
catch (ValidationException e) {
443+
// Skips retries - goes directly to DLT
444+
// (because ValidationException is configured as not retryable)
445+
throw new BatchListenerFailedException("Validation failed", e, record);
446+
}
447+
}
448+
}
449+
----
450+
451+
IMPORTANT: The error handler checks the **cause** (the second parameter) of the `BatchListenerFailedException`.
452+
If the cause is classified as not retryable, the record is immediately sent to the DLT without retries.
453+
454+
[[batch-listener-offset-commits]]
455+
=== Offset Commit Behavior
456+
457+
Understanding offset commits is important for batch error handling:
458+
459+
* **AckMode.BATCH** (most common for batch listeners):
460+
- Offsets before the failed record are committed before error handling
461+
- The failed record's offset is committed after successful recovery (DLT publishing)
462+
463+
* **AckMode.MANUAL_IMMEDIATE**:
464+
- Set `errorHandler.setCommitRecovered(true)` to commit recovered record offsets
465+
- You control acknowledgment timing in your listener
466+
467+
Example with manual acknowledgment:
468+
469+
[source, java]
470+
----
471+
@KafkaListener(id = "manual-batch", topics = "myTopic", containerFactory = "manualBatchFactory")
472+
public void listen(List<ConsumerRecord<String, Order>> records, Acknowledgment ack) {
473+
for (ConsumerRecord<String, Order> record : records) {
474+
try {
475+
process(record.value());
476+
}
477+
catch (Exception e) {
478+
throw new BatchListenerFailedException("Processing failed", e, record);
479+
}
480+
}
481+
ack.acknowledge();
482+
}
483+
----
484+
303485
[[batch-listener-conv-errors]]
304-
== Conversion Errors with Batch Error Handlers
486+
=== Conversion Errors with Batch Error Handlers
305487

306488
Starting with version 2.8, batch listeners can now properly handle conversion errors, when using a `MessageConverter` with a `ByteArrayDeserializer`, a `BytesDeserializer` or a `StringDeserializer`, as well as a `DefaultErrorHandler`.
307489
When a conversion error occurs, the payload is set to null and a deserialization exception is added to the record headers, similar to the `ErrorHandlingDeserializer`.
@@ -323,6 +505,46 @@ void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<Conve
323505
}
324506
----
325507

508+
[[batch-listener-deser-errors]]
509+
=== Deserialization Errors with Batch Listeners
510+
511+
Use `ErrorHandlingDeserializer` to handle deserialization failures gracefully:
512+
513+
[source, java]
514+
----
515+
@Bean
516+
public ConsumerFactory<String, Order> consumerFactory() {
517+
Map<String, Object> props = new HashMap<>();
518+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
519+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
520+
521+
// Wrap your deserializer with ErrorHandlingDeserializer
522+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
523+
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
524+
525+
return new DefaultKafkaConsumerFactory<>(props);
526+
}
527+
----
528+
529+
In your listener, check for `null` values which indicate deserialization failures:
530+
531+
[source, java]
532+
----
533+
@KafkaListener(id = "batch-deser", topics = "orders", containerFactory = "batchFactory")
534+
public void listen(List<ConsumerRecord<String, Order>> records) {
535+
for (ConsumerRecord<String, Order> record : records) {
536+
if (record.value() == null) {
537+
// Deserialization failed - throw exception to send to DLT
538+
// The DeadLetterPublishingRecoverer will restore the original byte[] value
539+
throw new BatchListenerFailedException("Deserialization failed", record);
540+
}
541+
process(record.value());
542+
}
543+
}
544+
----
545+
546+
When `DeadLetterPublishingRecoverer` publishes deserialization failures to the DLT, it automatically restores the original `byte[]` value that failed to deserialize.
547+
326548
[[retrying-batch-eh]]
327549
== Retrying Complete Batches
328550

0 commit comments

Comments
 (0)