Skip to content
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

package com.google.cloud.pubsub.spi.v1;

import com.google.api.gax.core.FlowController;
import com.google.api.gax.core.ApiClock;
import com.google.api.gax.core.FlowController;
import com.google.api.stats.Distribution;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
Expand All @@ -28,6 +28,7 @@
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -260,23 +261,46 @@ public int getMessageDeadlineSeconds() {
}

public void processReceivedMessages(List<com.google.pubsub.v1.ReceivedMessage> responseMessages) {
int receivedMessagesCount = responseMessages.size();
if (receivedMessagesCount == 0) {
if (responseMessages.isEmpty()) {
return;
}
Instant now = new Instant(clock.millisTime());
int totalByteCount = 0;

final ArrayList<AckHandler> ackHandlers = new ArrayList<>(responseMessages.size());
for (ReceivedMessage pubsubMessage : responseMessages) {
int messageSize = pubsubMessage.getMessage().getSerializedSize();
totalByteCount += messageSize;
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), messageSize));
ackHandlers.add(new AckHandler(pubsubMessage.getAckId(), pubsubMessage.getMessage().getSerializedSize()));

This comment was marked as spam.

This comment was marked as spam.

}

Instant now = new Instant(clock.millisTime());
Instant expiration = now.plus(messageDeadlineSeconds * 1000);
logger.log(
Level.FINER, "Received {0} messages at {1}", new Object[] {responseMessages.size(), now});

synchronized (outstandingAckHandlers) {
// AckDeadlineAlarm modifies lists in outstandingAckHandlers in-place and might run at any time.
// We will also later iterate over ackHandlers when we give messages to user code.
// We must create a new list to pass to outstandingAckHandlers,
// so that we can't iterate and modify the list concurrently.

This comment was marked as spam.

This comment was marked as spam.

outstandingAckHandlers.add(
new ExtensionJob(
expiration,
INITIAL_ACK_DEADLINE_EXTENSION_SECONDS,
new ArrayList<AckHandler>(ackHandlers)));
}
setupNextAckDeadlineExtensionAlarm(expiration);

// Deadline extension must be set up before we reserve flow control.
// Flow control might block for a while, and extension will keep messages from expiring.

This comment was marked as spam.

This comment was marked as spam.


try {
flowController.reserve(responseMessages.size(), totalMessageSize(responseMessages));
} catch (FlowController.FlowControlException e) {
throw new IllegalStateException("Flow control unexpected exception", e);
}
messagesWaiter.incrementPendingMessages(responseMessages.size());

// Reserving flow control must happen before we give the messages to the user,
// otherwise the user code might be given too many messages to process at once.

This comment was marked as spam.

This comment was marked as spam.


Iterator<AckHandler> acksIterator = ackHandlers.iterator();

This comment was marked as spam.

This comment was marked as spam.

for (ReceivedMessage userMessage : responseMessages) {
final PubsubMessage message = userMessage.getMessage();
Expand All @@ -302,18 +326,14 @@ public void run() {
}
});
}
}

synchronized (outstandingAckHandlers) {
outstandingAckHandlers.add(
new ExtensionJob(expiration, INITIAL_ACK_DEADLINE_EXTENSION_SECONDS, ackHandlers));
}
setupNextAckDeadlineExtensionAlarm(expiration);

try {
flowController.reserve(receivedMessagesCount, totalByteCount);
} catch (FlowController.FlowControlException unexpectedException) {
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
private static int totalMessageSize(Collection<ReceivedMessage> messages) {

This comment was marked as spam.

This comment was marked as spam.

int total = 0;
for (ReceivedMessage message : messages) {
total += message.getMessage().getSerializedSize();
}
return total;
}

private void setupPendingAcksAlarm() {
Expand Down