Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.batching.FlowController.FlowControlException;
import com.google.api.gax.core.Distribution;
import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
Expand All @@ -33,14 +32,11 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -91,9 +87,6 @@ class MessageDispatcher {
private final Lock jobLock;
private ScheduledFuture<?> backgroundJob;

private final LinkedBlockingDeque<OutstandingMessageBatch> outstandingMessageBatches =
new LinkedBlockingDeque<>();

// To keep track of number of seconds the receiver takes to process messages.
private final Distribution ackLatencyDistribution;

Expand Down Expand Up @@ -155,7 +148,6 @@ private void forget() {
}
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
processOutstandingBatches();
}

@Override
Expand Down Expand Up @@ -296,50 +288,19 @@ int getMessageDeadlineSeconds() {
return messageDeadlineSeconds.get();
}

static class OutstandingMessageBatch {
private final Deque<OutstandingMessage> messages;
private final Runnable doneCallback;

static class OutstandingMessage {
private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;

public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.receivedMessage = receivedMessage;
this.ackHandler = ackHandler;
}

public ReceivedMessage receivedMessage() {
return receivedMessage;
}

public AckHandler ackHandler() {
return ackHandler;
}
}
static class OutstandingMessage {
private final ReceivedMessage receivedMessage;
private final AckHandler ackHandler;

public OutstandingMessageBatch(Runnable doneCallback) {
this.messages = new LinkedList<>();
this.doneCallback = doneCallback;
}

public void addMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.messages.add(new OutstandingMessage(receivedMessage, ackHandler));
}

public Deque<OutstandingMessage> messages() {
return messages;
public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) {
this.receivedMessage = receivedMessage;
this.ackHandler = ackHandler;
}
}

public void processReceivedMessages(List<ReceivedMessage> messages, Runnable doneCallback) {
if (messages.isEmpty()) {
doneCallback.run();
return;
}

public void processReceivedMessages(List<ReceivedMessage> messages) {
Instant totalExpiration = now().plus(maxAckExtensionPeriod);
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
List<OutstandingMessage> outstandingBatch = new ArrayList<>(messages.size());
for (ReceivedMessage message : messages) {
AckHandler ackHandler =
new AckHandler(
Expand All @@ -355,42 +316,25 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
// totally expire so that pubsub service sends us the message again.
continue;
}
outstandingBatch.addMessage(message, ackHandler);
outstandingBatch.add(new OutstandingMessage(message, ackHandler));
pendingReceipts.add(message.getAckId());
}

if (outstandingBatch.messages.isEmpty()) {
doneCallback.run();
return;
}

messagesWaiter.incrementPendingMessages(outstandingBatch.messages.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can messagesWaiter be removed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it is needed to block shutdown on outstanding messages in the stop method.

public void stop() {
messagesWaiter.waitNoMessages();
jobLock.lock();
try {
if (backgroundJob != null) {
backgroundJob.cancel(false);
backgroundJob = null;
}
} finally {
jobLock.unlock();
}
processOutstandingAckOperations();
}

outstandingMessageBatches.add(outstandingBatch);
processOutstandingBatches();
processBatch(outstandingBatch);
}

private void processOutstandingBatches() {
for (OutstandingMessageBatch nextBatch = outstandingMessageBatches.poll();
nextBatch != null;
nextBatch = outstandingMessageBatches.poll()) {
for (OutstandingMessage nextMessage = nextBatch.messages.poll();
nextMessage != null;
nextMessage = nextBatch.messages.poll()) {
try {
// This is a non-blocking flow controller.
flowController.reserve(1, nextMessage.receivedMessage.getMessage().getSerializedSize());
} catch (FlowController.MaxOutstandingElementCountReachedException
| FlowController.MaxOutstandingRequestBytesReachedException flowControlException) {
// Unwind previous changes in the batches outstanding.
nextBatch.messages.addFirst(nextMessage);
outstandingMessageBatches.addFirst(nextBatch);
return;
} catch (FlowControlException unexpectedException) {
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
processOutstandingMessage(nextMessage.receivedMessage.getMessage(), nextMessage.ackHandler);
private void processBatch(List<OutstandingMessage> batch) {
messagesWaiter.incrementPendingMessages(batch.size());
for (OutstandingMessage message : batch) {
// This is a blocking flow controller. We have already incremented MessageWaiter, so
// shutdown will block on processing of all these messages anyway.
try {
flowController.reserve(1, message.receivedMessage.getMessage().getSerializedSize());
} catch (FlowControlException unexpectedException) {
// This should be a blocking flow controller and never throw an exception.
throw new IllegalStateException("Flow control unexpected exception", unexpectedException);
}
nextBatch.doneCallback.run();
processOutstandingMessage(message.receivedMessage.getMessage(), message.ackHandler);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,26 +151,20 @@ public void onStart(StreamController controller) {
@Override
public void onResponse(StreamingPullResponse response) {
channelReconnectBackoffMillis.set(INITIAL_CHANNEL_RECONNECT_BACKOFF.toMillis());
messageDispatcher.processReceivedMessages(
response.getReceivedMessagesList(),
new Runnable() {
@Override
public void run() {
// Only request more if we're not shutdown.
// If errorFuture is done, the stream has either failed or hung up,
// and we don't need to request.
if (isAlive() && !errorFuture.isDone()) {
lock.lock();
try {
thisController.request(1);
} catch (Exception e) {
logger.log(Level.WARNING, "cannot request more messages", e);
} finally {
lock.unlock();
}
}
}
});
messageDispatcher.processReceivedMessages(response.getReceivedMessagesList());
// Only request more if we're not shutdown.
// If errorFuture is done, the stream has either failed or hung up,
// and we don't need to request.
if (isAlive() && !errorFuture.isDone()) {
lock.lock();
try {
thisController.request(1);
} catch (Exception e) {
logger.log(Level.WARNING, "cannot request more messages", e);
} finally {
lock.unlock();
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private Subscriber(Builder builder) {
builder
.flowControlSettings
.toBuilder()
.setLimitExceededBehavior(LimitExceededBehavior.ThrowException)
.setLimitExceededBehavior(LimitExceededBehavior.Block)
.build());

this.numPullers = builder.parallelPullCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void sendAckOperations(
new FlowController(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
.build());

dispatcher =
Expand All @@ -124,31 +124,31 @@ public void sendAckOperations(

@Test
public void testReceipt() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
dispatcher.processOutstandingAckOperations();
assertThat(sentModAcks)
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
}

@Test
public void testAck() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
consumers.take().ack();
dispatcher.processOutstandingAckOperations();
assertThat(sentAcks).contains(TEST_MESSAGE.getAckId());
}

@Test
public void testNack() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
consumers.take().nack();
dispatcher.processOutstandingAckOperations();
assertThat(sentModAcks).contains(ModAckItem.of(TEST_MESSAGE.getAckId(), 0));
}

@Test
public void testExtension() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
dispatcher.extendDeadlines();
assertThat(sentModAcks)
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
Expand All @@ -161,7 +161,7 @@ public void testExtension() throws Exception {

@Test
public void testExtension_Close() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
dispatcher.extendDeadlines();
assertThat(sentModAcks)
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
Expand All @@ -176,7 +176,7 @@ public void testExtension_Close() throws Exception {

@Test
public void testExtension_GiveUp() throws Exception {
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
dispatcher.extendDeadlines();
assertThat(sentModAcks)
.contains(ModAckItem.of(TEST_MESSAGE.getAckId(), Subscriber.MIN_ACK_DEADLINE_SECONDS));
Expand All @@ -188,7 +188,7 @@ public void testExtension_GiveUp() throws Exception {
dispatcher.extendDeadlines();
assertThat(sentModAcks).isEmpty();

// We should be able to reserve another item in the flow controller and not block shutdown
// We should be able to reserve another item in the flow controller and not block.
flowController.reserve(1, 0);
dispatcher.stop();
}
Expand All @@ -197,7 +197,7 @@ public void testExtension_GiveUp() throws Exception {
public void testDeadlineAdjustment() throws Exception {
assertThat(dispatcher.computeDeadlineSeconds()).isEqualTo(10);

dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE), NOOP_RUNNABLE);
dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE));
clock.advance(42, TimeUnit.SECONDS);
consumers.take().ack();

Expand Down