Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class MessageDispatcher {
private final MessageWaiter messagesWaiter;

// Maps ID to "total expiration time". If it takes longer than this, stop extending.
private final ConcurrentMap<String, Instant> pendingMessages = new ConcurrentHashMap<>();
private final ConcurrentMap<AckHandler, Instant> pendingMessages = new ConcurrentHashMap<>();

private final LinkedBlockingQueue<String> pendingAcks = new LinkedBlockingQueue<>();
private final LinkedBlockingQueue<String> pendingNacks = new LinkedBlockingQueue<>();
Expand Down Expand Up @@ -141,7 +141,7 @@ private class AckHandler implements FutureCallback<AckReply> {
}

private void onBoth(LinkedBlockingQueue<String> destination) {
pendingMessages.remove(ackId);
pendingMessages.remove(this);
destination.add(ackId);
flowController.release(1, outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
Expand Down Expand Up @@ -329,17 +329,15 @@ public void processReceivedMessages(List<ReceivedMessage> messages, Runnable don
}
messagesWaiter.incrementPendingMessages(messages.size());

Instant totalExpiration = now().plus(maxAckExtensionPeriod);
for (ReceivedMessage message : messages) {
pendingReceipts.add(message.getAckId());
pendingMessages.put(message.getAckId(), totalExpiration);
}

Instant totalExpiration = now().plus(maxAckExtensionPeriod);
OutstandingMessageBatch outstandingBatch = new OutstandingMessageBatch(doneCallback);
for (ReceivedMessage message : messages) {
AckHandler ackHandler =
new AckHandler(message.getAckId(), message.getMessage().getSerializedSize());
outstandingBatch.addMessage(message, ackHandler);
pendingReceipts.add(message.getAckId());
pendingMessages.put(ackHandler, totalExpiration);
}
synchronized (outstandingMessageBatches) {
outstandingMessageBatches.add(outstandingBatch);
Expand Down Expand Up @@ -436,10 +434,10 @@ void extendDeadlines() {
Instant extendTo = now.plusSeconds(extendSeconds);

int count = 0;
Iterator<Map.Entry<String, Instant>> it = pendingMessages.entrySet().iterator();
Iterator<Map.Entry<AckHandler, Instant>> it = pendingMessages.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, Instant> entry = it.next();
String ackId = entry.getKey();
Map.Entry<AckHandler, Instant> entry = it.next();
String ackId = entry.getKey().ackId;
Instant totalExpiration = entry.getValue();
// TODO(pongad): PendingModifyAckDeadline is created to dance around polling pull,
// since one modack RPC only takes one expiration.
Expand All @@ -455,6 +453,9 @@ void extendDeadlines() {
int sec = Math.max(1, (int) now.until(totalExpiration, ChronoUnit.SECONDS));
modacks.add(new PendingModifyAckDeadline(sec, ackId));
count++;
} else {
flowController.release(1, entry.getKey().outstandingBytes);
messagesWaiter.incrementPendingMessages(-1);
}
}
modacks.add(modack);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public void run() {
private List<String> sentAcks;
private List<ModAckItem> sentModAcks;
private FakeClock clock;
private FlowController flowController;

@AutoValue
abstract static class ModAckItem {
Expand Down Expand Up @@ -101,6 +102,12 @@ public void sendAckOperations(
systemExecutor.shutdownNow();

clock = new FakeClock();
flowController =
new FlowController(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(1L)
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.ThrowException)
.build());

dispatcher =
new MessageDispatcher(
Expand All @@ -109,7 +116,7 @@ public void sendAckOperations(
Duration.ofSeconds(5),
Duration.ofMinutes(60),
new Distribution(Subscriber.MAX_ACK_DEADLINE_SECONDS + 1),
new FlowController(FlowControlSettings.newBuilder().build()),
flowController,
new LinkedList<MessageDispatcher.OutstandingMessageBatch>(),
MoreExecutors.directExecutor(),
systemExecutor,
Expand Down Expand Up @@ -182,6 +189,10 @@ public void testExtension_GiveUp() throws Exception {
clock.advance(1, TimeUnit.DAYS);
dispatcher.extendDeadlines();
assertThat(sentModAcks).isEmpty();

// We should be able to reserve another item in the flow controller and not block shutdown
flowController.reserve(1, 0);

This comment was marked as spam.

dispatcher.stop();
}

@Test
Expand Down