Skip to content
Closed
Show file tree
Hide file tree
Changes from 56 commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
41adb13
Merge from master. (#4468)
kamalaboulhosn Feb 8, 2019
7793aa3
Merge branch 'pubsub-ordering-keys' of github.com:GoogleCloudPlatform…
sduskis Feb 8, 2019
41430b2
Add SequentialExecutorService that assists Pub/Sub to publish message…
kimkyung-goog Feb 8, 2019
81ea157
Add ordered publishing support to Cloud Pub/Sub (#4474)
kamalaboulhosn Feb 15, 2019
fab0188
Pubsub ordering keys subscriber (#4515)
kamalaboulhosn Feb 20, 2019
858d4e9
Change the order of publishing batches when a large message is reques…
kimkyung-goog Mar 4, 2019
c2c8b5c
Merge from master; Resolve conflicts (#4943)
kimkyung-goog Apr 15, 2019
ad54d69
Merge branch 'master' into pubsub-ordering-keys
sduskis Apr 15, 2019
e026c5c
Merge branch 'master' into pubsub-ordering-keys
sduskis Apr 15, 2019
cc44d4b
Merge branch 'master' into pubsub-ordering-keys
sduskis Apr 16, 2019
e492b04
Update Publisher.java
sduskis Apr 16, 2019
3f9a663
Fixing format
sduskis Apr 16, 2019
e88432a
Merge branch 'pubsub-ordering-keys' of github.com:GoogleCloudPlatform…
sduskis Apr 16, 2019
7732506
Refactoring of the Pub/Sub Ordering keys client (#4962)
sduskis Apr 17, 2019
b68add0
[WIP] Refactoring SequentialExecutorService (#4969)
sduskis Apr 17, 2019
54420a4
Running the formatter.
sduskis Apr 18, 2019
7f96c1a
Merge remote-tracking branch 'upstream/master' into merge_master
sduskis Apr 18, 2019
6be9e3d
Merged with the formatting changes. (#4978)
sduskis Apr 18, 2019
7487e88
Fixng a bad merge.
sduskis Apr 18, 2019
708d785
Refactoring SequentialExecutorService.java (#4979)
sduskis Apr 18, 2019
d556ce4
Cleaning up generics in SequentialExecutorService (#4982)
sduskis Apr 18, 2019
4a26abe
Adding comments to CallbackExecutor.submit (#4981)
sduskis Apr 18, 2019
18aca86
Exposing AutoExecutor and CallbackExecutor directly (#4983)
sduskis Apr 18, 2019
b06fd9e
More refactoring to SequentialExecutor (#4984)
sduskis Apr 19, 2019
40e83a1
Merge branch 'master' into pubsub-ordering-keys
sduskis Apr 22, 2019
43f2ff3
Merge branch 'pubsub-ordering-keys' of github.com:googleapis/google-c…
sduskis Apr 22, 2019
d661492
SequentialExecutorService.callNextTaskAsync only uses key (#4992)
sduskis Apr 22, 2019
dd8a5ce
Merge branch 'master' into pubsub-ordering-keys
sduskis Apr 22, 2019
97ba17c
SequentialExecutorService now uses generics for Runnables.
sduskis Apr 22, 2019
02e06ae
Using a Queue instead of Deque.
sduskis Apr 22, 2019
c90b83c
Renaming a variable in Publisher
sduskis Apr 22, 2019
956537c
More refactoring to SequentialExecutorService.
sduskis Apr 23, 2019
75acad9
Running the formatter
sduskis Apr 24, 2019
5480455
Merge branch 'master' into pubsub-ordering-keys
sduskis Apr 25, 2019
d98f981
Merge branch 'master' into pubsub-ordering-keys
sduskis Apr 25, 2019
3bdbf77
Merge branch 'master' into pubsub-ordering-keys
sduskis Apr 25, 2019
41b9975
Merge branch 'master' into pubsub-ordering-keys
sduskis Apr 28, 2019
42697e2
Reformatting the publisher
sduskis Apr 28, 2019
fd07e9e
Pub/Sub: publishAll defers sending batches.
sduskis Apr 29, 2019
74daec8
Reverting last change to publishAllOutstanding
sduskis Apr 30, 2019
dd8db2e
Merge branch 'master' into pubsub-ordering-keys
sduskis Apr 30, 2019
9259fe6
Merge branch 'master' into pubsub-ordering-keys
sduskis Apr 30, 2019
8573e5a
Cleaning up messageBatch entries
sduskis May 1, 2019
ec2ecc6
Alarms now wait to publish until previous bundle completes
sduskis May 1, 2019
0b00228
Using Preconditions in Publisher.
sduskis May 1, 2019
142196c
Merge branch 'master' into pubsub-ordering-keys
sduskis May 1, 2019
92a7bf4
The Publisher's callback should happen second
sduskis May 1, 2019
32f4e23
Merge branch 'master' into pubsub-ordering-keys
sduskis May 1, 2019
f6fcbed
Fixing a flaky test
sduskis May 1, 2019
c918164
Adding resume publish. (#5046)
sduskis May 2, 2019
6073e7e
Fixing formatting issues
sduskis May 2, 2019
5a81613
Adding comments
sduskis May 3, 2019
e9e754c
Fixing formatting
sduskis May 13, 2019
33ebb40
Ordering keys setter is now package private
sduskis May 15, 2019
cec8e94
Cleanup before merge to main line
sduskis May 15, 2019
2400120
Merge branch 'master' into pubsub-ordering-keys
sduskis May 23, 2019
52c7c93
Merge branch 'master' into pubsub-ordering-keys
sduskis May 29, 2019
e463a05
Fixing lint
sduskis May 29, 2019
c01aa11
Merge branch 'master' into pubsub-ordering-keys
sduskis May 31, 2019
3087968
publish() doesn't use an executor.
sduskis May 31, 2019
da6dd3c
Moving the publish RPC call into the lock
sduskis May 31, 2019
ede8436
Updating GAX in clients to 1.45.0 for docs.
sduskis Jun 5, 2019
ae78886
Merge remote-tracking branch 'upstream/master'
sduskis Jun 11, 2019
99d1396
Merge remote-tracking branch 'upstream/master'
sduskis Jun 11, 2019
cf6d528
Merge remote-tracking branch 'upstream/master'
sduskis Jun 11, 2019
3ea3dc9
Merge branch 'master' into pubsub-ordering-keys
sduskis Jun 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class MessageDispatcher {
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);

private final Executor executor;
private final SequentialExecutorService.AutoExecutor sequentialExecutor;
private final ScheduledExecutorService systemExecutor;
private final ApiClock clock;

Expand Down Expand Up @@ -205,6 +206,7 @@ void sendAckOperations(
jobLock = new ReentrantLock();
messagesWaiter = new MessageWaiter();
this.clock = clock;
this.sequentialExecutor = new SequentialExecutorService.AutoExecutor(executor);
}

void start() {
Expand Down Expand Up @@ -349,7 +351,7 @@ public void nack() {
}
};
ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor());
executor.execute(
Runnable deliverMessageTask =
new Runnable() {
@Override
public void run() {
Expand All @@ -370,7 +372,12 @@ public void run() {
response.setException(e);
}
}
});
};
if (message.getOrderingKey().isEmpty()) {
executor.execute(deliverMessageTask);
} else {
sequentialExecutor.submit(message.getOrderingKey(), deliverMessageTask);
}
}

/** Compute the ideal deadline, set subsequent modacks to this deadline, and return it. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@
import com.google.pubsub.v1.TopicNames;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -85,15 +89,17 @@ public class Publisher {
private final String topicName;

private final BatchingSettings batchingSettings;
private final boolean enableMessageOrdering;

private final Lock messagesBatchLock;
private MessagesBatch messagesBatch;
final Map<String, MessagesBatch> messagesBatches;

private final AtomicBoolean activeAlarm;

private final PublisherStub publisherStub;

private final ScheduledExecutorService executor;
final SequentialExecutorService.CallbackExecutor sequentialExecutor;
private final AtomicBoolean shutdown;
private final BackgroundResource backgroundResources;
private final MessageWaiter messagesWaiter;
Expand All @@ -114,22 +120,33 @@ private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;

this.batchingSettings = builder.batchingSettings;
this.enableMessageOrdering = builder.enableMessageOrdering;
this.messageTransform = builder.messageTransform;

messagesBatch = new MessagesBatch(batchingSettings);
messagesBatches = new HashMap<>();
messagesBatchLock = new ReentrantLock();
activeAlarm = new AtomicBoolean(false);
executor = builder.executorProvider.getExecutor();
sequentialExecutor = new SequentialExecutorService.CallbackExecutor(executor);
List<BackgroundResource> backgroundResourceList = new ArrayList<>();
if (builder.executorProvider.shouldAutoClose()) {
backgroundResourceList.add(new ExecutorAsBackgroundResource(executor));
}

// Publisher used to take maxAttempt == 0 to mean infinity, but to GAX it means don't retry.
// We post-process this here to keep backward-compatibility.
RetrySettings retrySettings = builder.retrySettings;
if (retrySettings.getMaxAttempts() == 0) {
retrySettings = retrySettings.toBuilder().setMaxAttempts(Integer.MAX_VALUE).build();
// Also, if "message ordering" is enabled, the publisher should retry sending the failed
// message infinitely rather than sending the next one.
RetrySettings.Builder retrySettingsBuilder = builder.retrySettings.toBuilder();
if (retrySettingsBuilder.getMaxAttempts() == 0) {
retrySettingsBuilder.setMaxAttempts(Integer.MAX_VALUE);
}
if (enableMessageOrdering) {
// TODO: is there a way to have the default retry settings for requests without an ordering
// key?
retrySettingsBuilder
.setMaxAttempts(Integer.MAX_VALUE)
.setTotalTimeout(Duration.ofNanos(Long.MAX_VALUE));
}

PublisherStubSettings.Builder stubSettings =
Expand All @@ -147,7 +164,7 @@ private Publisher(Builder builder) throws IOException {
StatusCode.Code.RESOURCE_EXHAUSTED,
StatusCode.Code.UNKNOWN,
StatusCode.Code.UNAVAILABLE)
.setRetrySettings(retrySettings)
.setRetrySettings(retrySettingsBuilder.build())
.setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build());
this.publisherStub = GrpcPublisherStub.create(stubSettings.build());
backgroundResourceList.add(publisherStub);
Expand Down Expand Up @@ -194,13 +211,27 @@ public String getTopicNameString() {
public ApiFuture<String> publish(PubsubMessage message) {
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");

final String orderingKey = message.getOrderingKey();
Preconditions.checkState(
orderingKey.isEmpty() || enableMessageOrdering,
"Cannot publish a message with an ordering key when message ordering is not enabled.");

final OutstandingPublish outstandingPublish =
new OutstandingPublish(messageTransform.apply(message));
List<OutstandingBatch> batchesToSend;
messagesBatchLock.lock();
try {
// Check if the next message makes the current batch exceed the max batch byte size.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this comment can be removed now.

MessagesBatch messagesBatch = messagesBatches.get(orderingKey);
if (messagesBatch == null) {
messagesBatch = new MessagesBatch(batchingSettings, orderingKey);
messagesBatches.put(orderingKey, messagesBatch);
}

batchesToSend = messagesBatch.add(outstandingPublish);
// Setup the next duration based delivery alarm if there are messages batched.
if (!batchesToSend.isEmpty() && messagesBatch.isEmpty()) {
messagesBatches.remove(orderingKey);
}
setupAlarm();
} finally {
messagesBatchLock.unlock();
Expand All @@ -209,6 +240,10 @@ public ApiFuture<String> publish(PubsubMessage message) {
messagesWaiter.incrementPendingMessages(1);

if (!batchesToSend.isEmpty()) {
// TODO: if this is not an ordering keys scenario, will this do anything?
publishAllWithoutInflight();

// TODO: if this is an ordering keys scenario, is this safe without messagesBatchLock?
Copy link

@kimkyung-goog kimkyung-goog May 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this safe without messagesBatchLock?

Since we are assuming publish() for the same ordering key is not called by multiple threads, I think it would be safe without the lock. If users use multiple threads for publishing an ordering key, we cannot guarantee the order anyway. However, it is always possible that I am missing something. WDYT? (But, the executor below should still be removed. Please see my another comment below)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we are assuming this. People could call publish for the same ordering key from multiple threads. It just means that the order among those two messages can not be known by the publisher. It will still be established (and will in fact be established in the order in which they are sent from the publisher client to Cloud Pub/Sub.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, users can definitely use threads for the same ordering key if they want, and we should support it. I think what I wanted to say was that if a user calls publish() for the same ordering key concurrently using threads without a proper synchronization, then there is no ordering in that behavior anyway.
I actually looked into the code again, and I think I was wrong. We should get messagesBatchLock. Here is the scenario:
(1) By messagesBatch.add(outstandingPublish) at line #231, outstandingPublish is added to messages, and an old batch is returned.
(2) Before the returned batch is published, publishAllWithoutInflight() is called by the timer.
(3) publishAllWithoutInflight() publishes the message added to the current batch at (1). Now, the new message is published before the old batch is published.

for (final OutstandingBatch batch : batchesToSend) {
logger.log(Level.FINER, "Scheduling a batch for immediate sending.");
executor.execute(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that this execute() call was removed in the original implementation of ordering keys, but was added back when master is merged to this branch. Please see 3bdbf77#diff-c482c19af8c46aea82546fa17236dfd2

publishOutstandingBatch() should not be called by the executor here because it can reorder sequence of invoking the function. Inside publishOutstandingBatch(), we run an executor, a regular one for non-ordered messages and the sequential executor for ordered messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woops. I'll change it back. What do you think about moving the publish* methods inside the lock?

Expand All @@ -224,8 +259,22 @@ public void run() {
return outstandingPublish.publishResult;
}

/**
* There may be non-recoverable problems with a request for an ordering key. In that case, all
* subsequent requests will fail until this method is called. If the key is not currently paused,
* calling this method will be a no-op.
*
* @param key The key for which to resume publishing.
*/
// TODO: make this public when Ordering keys is live
@BetaApi
void resumePublish(String key) {
Preconditions.checkState(!shutdown.get(), "Cannot publish on a shut-down publisher.");
sequentialExecutor.resumePublish(key);
}

private void setupAlarm() {
if (!messagesBatch.isEmpty()) {
if (!messagesBatches.isEmpty()) {
if (!activeAlarm.getAndSet(true)) {
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
logger.log(Level.FINER, "Setting up alarm for the next {0} ms.", delayThresholdMs);
Expand All @@ -236,7 +285,7 @@ private void setupAlarm() {
public void run() {
logger.log(Level.FINER, "Sending messages based on schedule.");
activeAlarm.getAndSet(false);
publishAllOutstanding();
publishAllWithoutInflight();
}
},
delayThresholdMs,
Expand All @@ -257,16 +306,51 @@ public void run() {
*/
public void publishAllOutstanding() {
messagesBatchLock.lock();
OutstandingBatch batchToSend;
try {
if (messagesBatch.isEmpty()) {
return;
for (MessagesBatch batch : messagesBatches.values()) {
if (!batch.isEmpty()) {
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If
// it's released, the order of publishing cannot be guaranteed if `publish()` is called
// while this function is running. This locking mechanism needs to be improved if it
// causes any performance degradation.
publishOutstandingBatch(batch.popOutstandingBatch());
}
}
messagesBatches.clear();
} finally {
messagesBatchLock.unlock();
}
}

/**
* Publish any outstanding batches if non-empty and there are no other batches in flight. This
* method sends buffered messages, but does not wait for the send operations to complete. To wait
* for messages to send, call {@code get} on the futures returned from {@code publish}.
*/
private void publishAllWithoutInflight() {
messagesBatchLock.lock();
try {
Iterator<Map.Entry<String, MessagesBatch>> it = messagesBatches.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, MessagesBatch> entry = it.next();
MessagesBatch batch = entry.getValue();
String key = entry.getKey();
if (batch.isEmpty()) {
it.remove();
} else if (key.isEmpty() || !sequentialExecutor.hasTasksInflight(key)) {
// TODO(kimkyung-goog): Do not release `messagesBatchLock` when publishing a batch. If

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I think this comment is confusing. Can you remove it? I think you already addressed the potential performance issue in the TODO comment below.

// it's released, the order of publishing cannot be guaranteed if `publish()` is called
// while this function is running. This locking mechanism needs to be improved if it
// causes any performance degradation.

// TODO: Will this cause a performance problem for non-ordering keys scenarios?
Copy link

@kimkyung-goog kimkyung-goog May 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, your comment is for the non-ordering scenarios. Can you also add the potential performance issue related to locking mechanism here? We grab a lock for the entire batch before the for loop, which means no messages can be added when this function is running. One of the typical solutions might be getting a reader lock for the message batches before the for loop and getting a writer lock for each batch before calling publishOutstandingBatch(batch.popOutstandingBatch()).
I am not sure if this will make it better or even worse. It will probably depends on how many ordering keys are involved. Any ideas?

publishOutstandingBatch(batch.popOutstandingBatch());
it.remove();
}
}
batchToSend = messagesBatch.popOutstandingBatch();
} finally {
messagesBatchLock.unlock();
}
publishOutstandingBatch(batchToSend);
}

private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch) {
Expand All @@ -280,12 +364,12 @@ private ApiFuture<PublishResponse> publishCall(OutstandingBatch outstandingBatch
}

private void publishOutstandingBatch(final OutstandingBatch outstandingBatch) {
ApiFutureCallback<PublishResponse> futureCallback =
final ApiFutureCallback<PublishResponse> futureCallback =
new ApiFutureCallback<PublishResponse>() {
@Override
public void onSuccess(PublishResponse result) {
try {
if (result.getMessageIdsCount() != outstandingBatch.size()) {
if (result == null || result.getMessageIdsCount() != outstandingBatch.size()) {
outstandingBatch.onFailure(
new IllegalStateException(
String.format(
Expand All @@ -311,20 +395,37 @@ public void onFailure(Throwable t) {
}
};

ApiFutures.addCallback(publishCall(outstandingBatch), futureCallback, directExecutor());
ApiFuture<PublishResponse> future;
if (outstandingBatch.orderingKey == null || outstandingBatch.orderingKey.isEmpty()) {
future = publishCall(outstandingBatch);
} else {
// If ordering key is specified, publish the batch using the sequential executor.
future =
sequentialExecutor.submit(
outstandingBatch.orderingKey,
new Callable<ApiFuture<PublishResponse>>() {
public ApiFuture<PublishResponse> call() {
return publishCall(outstandingBatch);
}
});
}
ApiFutures.addCallback(future, futureCallback, directExecutor());
}

private static final class OutstandingBatch {
final List<OutstandingPublish> outstandingPublishes;
final long creationTime;
int attempt;
int batchSizeBytes;
final String orderingKey;

OutstandingBatch(List<OutstandingPublish> outstandingPublishes, int batchSizeBytes) {
OutstandingBatch(
List<OutstandingPublish> outstandingPublishes, int batchSizeBytes, String orderingKey) {
this.outstandingPublishes = outstandingPublishes;
attempt = 1;
creationTime = System.currentTimeMillis();
this.batchSizeBytes = batchSizeBytes;
this.orderingKey = orderingKey;
}

int size() {
Expand Down Expand Up @@ -468,7 +569,7 @@ public static final class Builder {
.setRpcTimeoutMultiplier(2)
.setMaxRpcTimeout(DEFAULT_RPC_TIMEOUT)
.build();

static final boolean DEFAULT_ENABLE_MESSAGE_ORDERING = false;
private static final int THREADS_PER_CPU = 5;
static final ExecutorProvider DEFAULT_EXECUTOR_PROVIDER =
InstantiatingExecutorProvider.newBuilder()
Expand All @@ -482,6 +583,8 @@ public static final class Builder {

RetrySettings retrySettings = DEFAULT_RETRY_SETTINGS;

private boolean enableMessageOrdering = DEFAULT_ENABLE_MESSAGE_ORDERING;

private TransportChannelProvider channelProvider =
TopicAdminSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1).build();

Expand Down Expand Up @@ -576,6 +679,14 @@ public Builder setRetrySettings(RetrySettings retrySettings) {
return this;
}

/** Sets the message ordering option. */
// TODO: make this public when Ordering keys is live
@BetaApi
Builder setEnableMessageOrdering(boolean enableMessageOrdering) {
this.enableMessageOrdering = enableMessageOrdering;
return this;
}

/** Gives the ability to set a custom executor to be used by the library. */
public Builder setExecutorProvider(ExecutorProvider executorProvider) {
this.executorProvider = Preconditions.checkNotNull(executorProvider);
Expand All @@ -601,15 +712,17 @@ public Publisher build() throws IOException {
private static class MessagesBatch {
private List<OutstandingPublish> messages;
private int batchedBytes;
private String orderingKey;
private final BatchingSettings batchingSettings;

public MessagesBatch(BatchingSettings batchingSettings) {
private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) {
this.batchingSettings = batchingSettings;
this.orderingKey = orderingKey;
reset();
}

private OutstandingBatch popOutstandingBatch() {
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes);
OutstandingBatch batch = new OutstandingBatch(messages, batchedBytes, orderingKey);
reset();
return batch;
}
Expand Down
Loading