Skip to content

Commit 28e34c3

Browse files
brun0-4ugustorafaelcgpava
authored andcommitted
AsyncDefaultErrorHandler changes msg visibility timeout to zero. (#1314)
1 parent 79ac8be commit 28e34c3

3 files changed

Lines changed: 299 additions & 1 deletion

File tree

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener.errorhandler;
17+
18+
import io.awspring.cloud.sqs.MessageHeaderUtils;
19+
import io.awspring.cloud.sqs.listener.QueueMessageVisibility;
20+
import io.awspring.cloud.sqs.listener.SqsHeaders;
21+
import io.awspring.cloud.sqs.listener.Visibility;
22+
import java.util.Collection;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.stream.Collectors;
25+
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
import org.springframework.messaging.Message;
29+
30+
/**
31+
* A default error handler implementation for asynchronous message processing.
32+
*
33+
* <p>
34+
* This error handler attempts to set the SQS message visibility timeout to zero whenever an exception occurs,
35+
* effectively making the message immediately available for reprocessing.
36+
*
37+
* <p>
38+
* Returning a failed future ensures that the message is <strong>not acknowledged</strong>.
39+
* If a successful future were returned, the message would be considered
40+
* successfully recovered and acknowledged.
41+
* @author Bruno Garcia
42+
* @author Rafael Pavarini
43+
*/
44+
public class AsyncDefaultErrorHandler<T> implements AsyncErrorHandler<T> {
45+
46+
@Override
47+
public CompletableFuture<Void> handle(Message<T> message, Throwable t) {
48+
changeTimeoutToZero(message);
49+
return CompletableFuture.failedFuture(t);
50+
}
51+
52+
@Override
53+
public CompletableFuture<Void> handle(Collection<Message<T>> messages, Throwable t) {
54+
changeTimeoutToZero(messages);
55+
return CompletableFuture.failedFuture(t);
56+
}
57+
58+
private void changeTimeoutToZero(Message<T> message) {
59+
Visibility visibilityTimeout = getVisibilityTimeout(message);
60+
visibilityTimeout.changeToAsync(0);
61+
}
62+
63+
private void changeTimeoutToZero(Collection<Message<T>> messages) {
64+
QueueMessageVisibility firstVisibilityMessage = (QueueMessageVisibility) getVisibilityTimeout(messages.iterator().next());
65+
66+
Collection<Message<?>> castedMessages = messages.stream()
67+
.map(m -> (Message<?>) m)
68+
.collect(Collectors.toList());
69+
70+
firstVisibilityMessage.toBatchVisibility(castedMessages).changeToAsync(0);
71+
}
72+
73+
private Visibility getVisibilityTimeout(Message<T> message) {
74+
return MessageHeaderUtils.getHeader(message, SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER, Visibility.class);
75+
}
76+
}

spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/integration/SqsIntegrationTests.java

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import io.awspring.cloud.sqs.listener.acknowledgement.BatchAcknowledgement;
4444
import io.awspring.cloud.sqs.listener.acknowledgement.SqsAcknowledgementExecutor;
4545
import io.awspring.cloud.sqs.listener.acknowledgement.handler.AcknowledgementMode;
46+
import io.awspring.cloud.sqs.listener.errorhandler.AsyncDefaultErrorHandler;
4647
import io.awspring.cloud.sqs.listener.errorhandler.AsyncErrorHandler;
4748
import io.awspring.cloud.sqs.listener.interceptor.AsyncMessageInterceptor;
4849
import io.awspring.cloud.sqs.listener.sink.MessageSink;
@@ -52,11 +53,13 @@
5253
import java.lang.reflect.Method;
5354
import java.time.Duration;
5455
import java.util.ArrayList;
56+
import java.util.Map;
5557
import java.util.Collection;
5658
import java.util.Collections;
5759
import java.util.List;
5860
import java.util.UUID;
5961
import java.util.concurrent.BrokenBarrierException;
62+
import java.util.concurrent.ConcurrentHashMap;
6063
import java.util.concurrent.CompletableFuture;
6164
import java.util.concurrent.CountDownLatch;
6265
import java.util.concurrent.CyclicBarrier;
@@ -93,6 +96,8 @@
9396
* @author Mikhail Strokov
9497
* @author Michael Sosa
9598
* @author gustavomonarin
99+
* @author Bruno Garcia
100+
* @author Rafael Pavarini
96101
*/
97102
@SpringBootTest
98103
@TestPropertySource(properties = { "property.one=1", "property.five.seconds=5s",
@@ -130,6 +135,10 @@ class SqsIntegrationTests extends BaseSqsIntegrationTest {
130135

131136
static final String MAX_CONCURRENT_MESSAGES_QUEUE_NAME = "max_concurrent_messages_test_queue";
132137

138+
static final String SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_QUEUE_NAME = "success_visibility_timeout_to_zero_test_queue";
139+
140+
static final String SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_BATCH_QUEUE_NAME = "success_visibility_batch_timeout_to_zero_test_queue";
141+
133142
static final String LOW_RESOURCE_FACTORY = "lowResourceFactory";
134143

135144
static final String MANUAL_ACK_FACTORY = "manualAcknowledgementFactory";
@@ -138,6 +147,8 @@ class SqsIntegrationTests extends BaseSqsIntegrationTest {
138147

139148
static final String ACK_AFTER_SECOND_ERROR_FACTORY = "ackAfterSecondErrorFactory";
140149

150+
static final String SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_FACTORY = "receivesMessageErrorFactory";
151+
141152
@BeforeAll
142153
static void beforeTests() {
143154
SqsAsyncClient client = createAsyncClient();
@@ -158,7 +169,12 @@ static void beforeTests() {
158169
createQueue(client, MANUALLY_CREATE_INACTIVE_CONTAINER_QUEUE_NAME),
159170
createQueue(client, MANUALLY_CREATE_FACTORY_QUEUE_NAME),
160171
createQueue(client, CONSUMES_ONE_MESSAGE_AT_A_TIME_QUEUE_NAME),
161-
createQueue(client, MAX_CONCURRENT_MESSAGES_QUEUE_NAME)).join();
172+
createQueue(client, MAX_CONCURRENT_MESSAGES_QUEUE_NAME),
173+
createQueue(client, SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_QUEUE_NAME,
174+
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "500")),
175+
createQueue(client, SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_BATCH_QUEUE_NAME,
176+
singletonMap(QueueAttributeName.VISIBILITY_TIMEOUT, "500")))
177+
.join();
162178
}
163179

164180
@Autowired
@@ -184,6 +200,27 @@ void receivesMessage() throws Exception {
184200
assertThat(latchContainer.acknowledgementCallbackSuccessLatch.await(10, TimeUnit.SECONDS)).isTrue();
185201
}
186202

203+
@Test
204+
void receivesMessageVisibilityTimeout() throws Exception {
205+
String messageBody = UUID.randomUUID().toString();
206+
sqsTemplate.send(SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_QUEUE_NAME, messageBody);
207+
logger.debug("Sent message to queue {} with messageBody {}", SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_QUEUE_NAME,
208+
messageBody);
209+
210+
assertThat(latchContainer.receivesRetryMessageQuicklyLatch.await(10, TimeUnit.SECONDS)).isTrue();
211+
}
212+
213+
@Test
214+
void receivesMessageVisibilityTimeoutBatch() throws Exception {
215+
List<Message<String>> messages = create10Messages("receivesMessageVisibilityTimeoutBatch");
216+
217+
sqsTemplate.sendManyAsync(SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_BATCH_QUEUE_NAME, messages);
218+
logger.debug("Sent message to queue {} with messageBody {}",
219+
SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_BATCH_QUEUE_NAME, messages);
220+
221+
assertThat(latchContainer.receivesRetryBatchMessageQuicklyLatch.await(10, TimeUnit.SECONDS)).isTrue();
222+
}
223+
187224
@Test
188225
void receivesMessageBatch() throws Exception {
189226
List<Message<String>> messages = create10Messages("receivesMessageBatch");
@@ -425,6 +462,68 @@ CompletableFuture<Void> listen(String message, @Header(SqsHeaders.SQS_QUEUE_NAME
425462
}
426463
}
427464

465+
static class ErrorHandlerVisibilityTest {
466+
467+
@Autowired
468+
LatchContainer latchContainer;
469+
470+
private static final Map<String, Long> previousReceivedMessageTimestamps = new ConcurrentHashMap<>();
471+
472+
private static final int MAX_EXPECTED_ELAPSED_TIME_BETWEEN_MSG_RECEIVES_IN_MS = 5000;
473+
474+
@SqsListener(queueNames = SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_QUEUE_NAME, messageVisibilitySeconds = "500", factory = SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_FACTORY, id = "visibilityErrHandler")
475+
CompletableFuture<Void> listen(Message<String> message,
476+
@Header(SqsHeaders.SQS_QUEUE_NAME_HEADER) String queueName) {
477+
logger.info("Received message {} from queue {}", message, queueName);
478+
String msgId = MessageHeaderUtils.getHeader(message, "id", UUID.class).toString();
479+
Long prevReceivedMessageTimestamp = previousReceivedMessageTimestamps.get(msgId);
480+
if (prevReceivedMessageTimestamp == null) {
481+
previousReceivedMessageTimestamps.put(msgId, System.currentTimeMillis());
482+
return CompletableFuture
483+
.failedFuture(new RuntimeException("Expected exception from visibility-err-handler"));
484+
}
485+
486+
long elapsedTimeBetweenMessageReceivesInMs = System.currentTimeMillis() - prevReceivedMessageTimestamp;
487+
if (elapsedTimeBetweenMessageReceivesInMs < MAX_EXPECTED_ELAPSED_TIME_BETWEEN_MSG_RECEIVES_IN_MS) {
488+
latchContainer.receivesRetryMessageQuicklyLatch.countDown();
489+
}
490+
491+
return CompletableFuture.completedFuture(null);
492+
}
493+
}
494+
495+
static class ErrorHandlerVisibilityBatchTest {
496+
497+
@Autowired
498+
LatchContainer latchContainer;
499+
500+
private static final Map<String, Long> previousReceivedMessageTimestamps = new ConcurrentHashMap<>();
501+
502+
private static final int MAX_EXPECTED_ELAPSED_TIME_BETWEEN_BATCH_MSG_RECEIVES_IN_MS = 5000;
503+
504+
@SqsListener(queueNames = SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_BATCH_QUEUE_NAME, messageVisibilitySeconds = "500", factory = SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_FACTORY, id = "visibilityBatchErrHandler")
505+
CompletableFuture<Void> listen(List<Message<String>> messages) {
506+
logger.info("Received messages {} from queue {}", MessageHeaderUtils.getId(messages),
507+
messages.get(0).getHeaders().get(SqsHeaders.SQS_QUEUE_NAME_HEADER));
508+
509+
for(Message<String> message : messages) {
510+
String msgId = MessageHeaderUtils.getHeader(message, "id", UUID.class).toString();
511+
if (!previousReceivedMessageTimestamps.containsKey(msgId)) {
512+
previousReceivedMessageTimestamps.put(msgId, System.currentTimeMillis());
513+
return CompletableFuture.failedFuture(new RuntimeException("Expected exception from visibility-err-handler"));
514+
}
515+
else {
516+
long timediff = System.currentTimeMillis() - previousReceivedMessageTimestamps.get(msgId);
517+
if (MAX_EXPECTED_ELAPSED_TIME_BETWEEN_BATCH_MSG_RECEIVES_IN_MS > timediff) {
518+
latchContainer.receivesRetryBatchMessageQuicklyLatch.countDown();
519+
}
520+
}
521+
}
522+
523+
return CompletableFuture.completedFuture(null);
524+
}
525+
}
526+
428527
static class DoesNotAckOnErrorBatchListener {
429528

430529
@Autowired
@@ -500,6 +599,8 @@ void listen(String message) throws BrokenBarrierException, InterruptedException
500599
static class LatchContainer {
501600

502601
final CountDownLatch receivesMessageLatch = new CountDownLatch(1);
602+
final CountDownLatch receivesRetryMessageQuicklyLatch = new CountDownLatch(1);
603+
final CountDownLatch receivesRetryBatchMessageQuicklyLatch = new CountDownLatch(10);
503604
final CountDownLatch receivesMessageBatchLatch = new CountDownLatch(20);
504605
final CountDownLatch receivesMessageAsyncLatch = new CountDownLatch(1);
505606
final CountDownLatch doesNotAckLatch = new CountDownLatch(2);
@@ -576,6 +677,21 @@ public SqsMessageListenerContainerFactory<Object> ackAfterSecondErrorFactory() {
576677
.build();
577678
}
578679

680+
@Bean(name = SUCCESS_VISIBILITY_TIMEOUT_TO_ZERO_FACTORY)
681+
public SqsMessageListenerContainerFactory<Object> errorHandlerVisibility() {
682+
return SqsMessageListenerContainerFactory
683+
.builder()
684+
.configure(options -> options
685+
.maxConcurrentMessages(10)
686+
.pollTimeout(Duration.ofSeconds(10))
687+
.maxMessagesPerPoll(10)
688+
.queueAttributeNames(Collections.singletonList(QueueAttributeName.QUEUE_ARN))
689+
.maxDelayBetweenPolls(Duration.ofSeconds(10)))
690+
.errorHandler(new AsyncDefaultErrorHandler<>())
691+
.sqsAsyncClientSupplier(BaseSqsIntegrationTest::createAsyncClient)
692+
.build();
693+
}
694+
579695
private List<ContainerComponentFactory<Object, SqsContainerOptions>> getExceptionThrowingAckExecutor() {
580696
return Collections.singletonList(new StandardSqsComponentFactory<Object>() {
581697
@Override
@@ -738,6 +854,16 @@ DoesNotAckOnErrorAsyncListener doesNotAckOnErrorAsyncListener() {
738854
return new DoesNotAckOnErrorAsyncListener();
739855
}
740856

857+
@Bean
858+
ErrorHandlerVisibilityTest errorHandlerVisibilityTest() {
859+
return new ErrorHandlerVisibilityTest();
860+
}
861+
862+
@Bean
863+
ErrorHandlerVisibilityBatchTest errorHandlerVisibilityBatchTest() {
864+
return new ErrorHandlerVisibilityBatchTest();
865+
}
866+
741867
@Bean
742868
DoesNotAckOnErrorBatchListener doesNotAckOnErrorBatchListener() {
743869
return new DoesNotAckOnErrorBatchListener();
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2013-2025 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.awspring.cloud.sqs.listener.errorhandler;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
import static org.mockito.Mockito.*;
21+
22+
import io.awspring.cloud.sqs.CompletableFutures;
23+
import io.awspring.cloud.sqs.listener.BatchVisibility;
24+
import io.awspring.cloud.sqs.listener.QueueMessageVisibility;
25+
import io.awspring.cloud.sqs.listener.SqsHeaders;
26+
import io.awspring.cloud.sqs.listener.Visibility;
27+
import java.util.Arrays;
28+
import java.util.List;
29+
import java.util.concurrent.CompletableFuture;
30+
31+
import org.assertj.core.api.Condition;
32+
import org.junit.jupiter.api.Test;
33+
import org.springframework.messaging.Message;
34+
import org.springframework.messaging.MessageHeaders;
35+
36+
/**
37+
* Tests for {@link AsyncDefaultErrorHandler}.
38+
*
39+
* @author Bruno Garcia
40+
* @author Rafael Pavarini
41+
*/
42+
@SuppressWarnings("unchecked")
43+
class AsyncDefaultErrorHandlerTests {
44+
45+
@Test
46+
void shouldChangeVisibilityToZero() {
47+
Message<Object> message = mock(Message.class);
48+
RuntimeException exception = new RuntimeException("Expected exception from shouldChangeVisibilityToZero");
49+
MessageHeaders headers = mock(MessageHeaders.class);
50+
Visibility visibility = mock(Visibility.class);
51+
when(message.getHeaders()).thenReturn(headers);
52+
when(headers.get(SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER, Visibility.class)).thenReturn(visibility);
53+
when(visibility.changeToAsync(0)).thenReturn(CompletableFuture.completedFuture(null));
54+
55+
AsyncDefaultErrorHandler<Object> handler = new AsyncDefaultErrorHandler<>();
56+
57+
assertThat(handler.handle(message, exception)).isCompletedExceptionally();
58+
verify(visibility).changeToAsync(0);
59+
}
60+
61+
@Test
62+
void shouldReturnError() {
63+
Message<Object> message = mock(Message.class);
64+
RuntimeException exception = new RuntimeException("Expected exception from shouldReturnError");
65+
MessageHeaders headers = mock(MessageHeaders.class);
66+
when(headers.get(SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER, Visibility.class)).thenReturn(null);
67+
when(message.getHeaders()).thenReturn(headers);
68+
69+
AsyncDefaultErrorHandler<Object> handler = new AsyncDefaultErrorHandler<>();
70+
71+
assertThatThrownBy(() -> handler.handle(message, exception));
72+
}
73+
74+
@Test
75+
void shouldChangeVisibilityToZeroBatch() {
76+
Message<Object> message1 = mock(Message.class);
77+
Message<Object> message2 = mock(Message.class);
78+
Message<Object> message3 = mock(Message.class);
79+
List<Message<Object>> batch = Arrays.asList(message1, message2, message3);
80+
RuntimeException exception = new RuntimeException("Expected exception from shouldChangeVisibilityToZeroBatch");
81+
MessageHeaders headers = mock(MessageHeaders.class);
82+
QueueMessageVisibility visibility = mock(QueueMessageVisibility.class);
83+
BatchVisibility batchvisibility = mock(BatchVisibility.class);
84+
when(batchvisibility.changeToAsync(0)).thenReturn(CompletableFuture.completedFuture(null));
85+
when(visibility.toBatchVisibility(any())).thenReturn(batchvisibility);
86+
when(headers.get(SqsHeaders.SQS_VISIBILITY_TIMEOUT_HEADER, Visibility.class)).thenReturn(visibility);
87+
when(message1.getHeaders()).thenReturn(headers);
88+
when(message2.getHeaders()).thenReturn(headers);
89+
when(message3.getHeaders()).thenReturn(headers);
90+
91+
AsyncDefaultErrorHandler<Object> handler = new AsyncDefaultErrorHandler<>();
92+
93+
assertThat(handler.handle(batch, exception)).isCompletedExceptionally();
94+
verify(batchvisibility, times(1)).changeToAsync(0);
95+
}
96+
}

0 commit comments

Comments
 (0)