Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ public class Publisher {
private final BatchingSettings batchingSettings;

private final Lock messagesBatchLock;
private List<OutstandingPublish> messagesBatch;
private int batchedBytes;
private MessagesBatch messagesBatch;

private final AtomicBoolean activeAlarm;

Expand Down Expand Up @@ -114,7 +113,7 @@ private Publisher(Builder builder) throws IOException {
this.batchingSettings = builder.batchingSettings;
this.messageTransform = builder.messageTransform;

messagesBatch = new LinkedList<>();
messagesBatch = new MessagesBatch();
messagesBatchLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
executor = builder.executorProvider.getExecutor();
Expand Down Expand Up @@ -205,24 +204,19 @@ public ApiFuture<String> publish(PubsubMessage message) {
// Check if the next message makes the current batch exceed the max batch byte size.
if (!messagesBatch.isEmpty()
&& hasBatchingBytes()
&& batchedBytes + messageSize >= getMaxBatchBytes()) {
batchToSend = new OutstandingBatch(messagesBatch, batchedBytes);
messagesBatch = new LinkedList<>();
batchedBytes = 0;
&& messagesBatch.getBatchedBytes() + messageSize >= getMaxBatchBytes()) {
batchToSend = messagesBatch.popOutstandingBatch();
}

// Border case if the message to send is greater or equals to the max batch size then can't
// be included in the current batch and instead sent immediately.
if (!hasBatchingBytes() || messageSize < getMaxBatchBytes()) {
batchedBytes += messageSize;
messagesBatch.add(outstandingPublish);
messagesBatch.addMessage(outstandingPublish, messageSize);

// If after adding the message we have reached the batch max messages then we have a batch
// to send.
if (messagesBatch.size() == getBatchingSettings().getElementCountThreshold()) {
batchToSend = new OutstandingBatch(messagesBatch, batchedBytes);
messagesBatch = new LinkedList<>();
batchedBytes = 0;
if (messagesBatch.getMessagesCount() == getBatchingSettings().getElementCountThreshold()) {
batchToSend = messagesBatch.popOutstandingBatch();
}
}
// Setup the next duration based delivery alarm if there are messages batched.
Expand Down Expand Up @@ -301,9 +295,7 @@ public void publishAllOutstanding() {
if (messagesBatch.isEmpty()) {
return;
}
batchToSend = new OutstandingBatch(messagesBatch, batchedBytes);
messagesBatch = new LinkedList<>();
batchedBytes = 0;
batchToSend = messagesBatch.popOutstandingBatch();
} finally {
messagesBatchLock.unlock();
}
Expand Down Expand Up @@ -637,4 +629,37 @@ public Publisher build() throws IOException {
return new Publisher(this);
}
}

private static class MessagesBatch {
private List<OutstandingPublish> messages = new LinkedList<>();
private int batchedBytes;

private OutstandingBatch popOutstandingBatch() {
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
reset();
return batch;
}

private void reset() {
messages = new LinkedList<>();
batchedBytes = 0;
}

private boolean isEmpty() {
return messages.isEmpty();
}

private int getBatchedBytes() {
return batchedBytes;
}

private void addMessage(OutstandingPublish message, int messageSize) {
messages.add(message);
batchedBytes += messageSize;
}

private int getMessagesCount() {
return messages.size();
}
};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nitpick: semicolon at the end of class definition is not required (and not common) in Java.
Otherwise, this looks good to me.

}