diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 5f5ebbaee204..bf79813f74e7 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -65,6 +65,7 @@ class MessageDispatcher { @InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100); private final Executor executor; + private final SequentialExecutorService sequentialExecutor; private final ScheduledExecutorService systemExecutor; private final ApiClock clock; @@ -217,6 +218,7 @@ void sendAckOperations( jobLock = new ReentrantLock(); messagesWaiter = new MessageWaiter(); this.clock = clock; + this.sequentialExecutor = new SequentialExecutorService(executor); } public void start() { @@ -401,46 +403,51 @@ public void processOutstandingBatches() { outstandingMessageBatches.poll(); batchCallback = nextBatch.doneCallback; } - } - final PubsubMessage message = outstandingMessage.receivedMessage().getMessage(); - final AckHandler ackHandler = outstandingMessage.ackHandler(); - final SettableApiFuture response = SettableApiFuture.create(); - final AckReplyConsumer consumer = - new AckReplyConsumer() { - @Override - public void ack() { - response.set(AckReply.ACK); - } - - @Override - public void nack() { - response.set(AckReply.NACK); - } - }; - ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); - executor.execute( - new Runnable() { - @Override - public void run() { - try { - if (ackHandler - .totalExpiration - .plusSeconds(messageDeadlineSeconds.get()) - .isBefore(now())) { - // Message expired while waiting. We don't extend these messages anymore, - // so it was probably sent to someone else. Don't work on it. - // Don't nack it either, because we'd be nacking someone else's message. - ackHandler.forget(); - return; - } + final PubsubMessage message = outstandingMessage.receivedMessage().getMessage(); + final AckHandler ackHandler = outstandingMessage.ackHandler(); + final SettableApiFuture response = SettableApiFuture.create(); + final AckReplyConsumer consumer = + new AckReplyConsumer() { + @Override + public void ack() { + response.set(AckReply.ACK); + } - receiver.receiveMessage(message, consumer); - } catch (Exception e) { - response.setException(e); + @Override + public void nack() { + response.set(AckReply.NACK); } - } - }); + }; + ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); + Runnable deliverMessageTask = + new Runnable() { + @Override + public void run() { + try { + if (ackHandler + .totalExpiration + .plusSeconds(messageDeadlineSeconds.get()) + .isBefore(now())) { + // Message expired while waiting. We don't extend these messages anymore, + // so it was probably sent to someone else. Don't work on it. + // Don't nack it either, because we'd be nacking someone else's message. + ackHandler.forget(); + return; + } + + receiver.receiveMessage(message, consumer); + } catch (Exception e) { + response.setException(e); + } + } + }; + if (message.getOrderingKey().isEmpty()) { + executor.execute(deliverMessageTask); + } else { + sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask); + } + } if (batchDone) { batchCallback.run(); } diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 785368bb13cb..0f5fb93664b1 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -28,8 +28,10 @@ import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -53,6 +55,7 @@ public void run() { private MessageDispatcher dispatcher; private LinkedBlockingQueue consumers; + private Map> messagesByOrderingKey; private List sentAcks; private List sentModAcks; private FakeClock clock; @@ -72,6 +75,7 @@ static ModAckItem of(String ackId, int seconds) { @Before public void setUp() { consumers = new LinkedBlockingQueue<>(); + messagesByOrderingKey = new HashMap<>(); sentAcks = new ArrayList<>(); sentModAcks = new ArrayList<>(); @@ -79,6 +83,12 @@ public void setUp() { new MessageReceiver() { @Override public void receiveMessage(final PubsubMessage message, final AckReplyConsumer consumer) { + List messages = messagesByOrderingKey.get(message.getOrderingKey()); + if (messages == null) { + messages = new ArrayList<>(); + messagesByOrderingKey.put(message.getOrderingKey(), messages); + } + messages.add(message.getData()); consumers.add(consumer); } }; @@ -205,4 +215,47 @@ public void testDeadlineAdjustment() throws Exception { assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(42); } + + private ReceivedMessage newReceivedMessage(String ackId, String orderingKey, String data) { + return ReceivedMessage.newBuilder() + .setAckId(ackId) + .setMessage( + PubsubMessage.newBuilder() + .setOrderingKey(orderingKey) + .setData(ByteString.copyFromUtf8(data)) + .build()) + .build(); + } + + @Test + public void testOrderingKey() throws Exception { + // Create messages with "orderA". + ReceivedMessage message1 = newReceivedMessage("ackId1", "orderA", "m1"); + ReceivedMessage message2 = newReceivedMessage("ackId2", "orderA", "m2"); + // Create messages with "orderB". + ReceivedMessage message3 = newReceivedMessage("ackId3", "orderB", "m3"); + ReceivedMessage message4 = newReceivedMessage("ackId4", "orderB", "m4"); + ReceivedMessage message5 = newReceivedMessage("ackId5", "orderB", "m5"); + + dispatcher.processReceivedMessages(Collections.singletonList(message1), NOOP_RUNNABLE); + consumers.take().ack(); + dispatcher.processReceivedMessages(Collections.singletonList(message2), NOOP_RUNNABLE); + consumers.take().ack(); + dispatcher.processReceivedMessages(Collections.singletonList(message3), NOOP_RUNNABLE); + consumers.take().ack(); + dispatcher.processReceivedMessages(Collections.singletonList(message4), NOOP_RUNNABLE); + consumers.take().ack(); + dispatcher.processReceivedMessages(Collections.singletonList(message5), NOOP_RUNNABLE); + consumers.take().ack(); + + assertThat(messagesByOrderingKey.get("orderA")) + .containsExactly(ByteString.copyFromUtf8("m1"), ByteString.copyFromUtf8("m2")) + .inOrder(); + assertThat(messagesByOrderingKey.get("orderB")) + .containsExactly( + ByteString.copyFromUtf8("m3"), + ByteString.copyFromUtf8("m4"), + ByteString.copyFromUtf8("m5")) + .inOrder(); + } }