Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsub.v1;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
Expand Down Expand Up @@ -95,6 +96,7 @@ public class Publisher {
private final List<AutoCloseable> closeables;
private final MessageWaiter messagesWaiter;
private ScheduledFuture<?> currentAlarmFuture;
private final ApiFunction<PubsubMessage, PubsubMessage> messageTransform;

/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
Expand All @@ -110,6 +112,7 @@ private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;

this.batchingSettings = builder.batchingSettings;
this.messageTransform = builder.messageTransform;

messagesBatch = new LinkedList<>();
messagesBatchLock = new ReentrantLock();
Expand Down Expand Up @@ -192,6 +195,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
throw new IllegalStateException("Cannot publish on a shut-down publisher.");
}

message = messageTransform.apply(message);
final int messageSize = message.getSerializedSize();
OutstandingBatch batchToSend = null;
SettableApiFuture<String> publishResult = SettableApiFuture.<String>create();
Expand Down Expand Up @@ -528,6 +532,14 @@ public static final class Builder {
CredentialsProvider credentialsProvider =
TopicAdminSettings.defaultCredentialsProviderBuilder().build();

ApiFunction<PubsubMessage, PubsubMessage> messageTransform =
new ApiFunction<PubsubMessage, PubsubMessage>() {
@Override
public PubsubMessage apply(PubsubMessage input) {
return input;
}
};

private Builder(String topic) {
this.topicName = Preconditions.checkNotNull(topic);
}
Expand Down Expand Up @@ -610,6 +622,16 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
return this;
}

/**
* Gives the ability to set an {@link ApiFunction} that will transform the {@link PubsubMessage}
* before it is sent
*/
public Builder setTransform(ApiFunction<PubsubMessage, PubsubMessage> messageTransform) {
this.messageTransform =
Preconditions.checkNotNull(messageTransform, "The messageTransform cannnot be null.");
return this;
}

public Publisher build() throws IOException {
return new Publisher(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.api.core.ApiFunction;
import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.ExecutorProvider;
Expand All @@ -43,6 +44,7 @@
import io.grpc.inprocess.InProcessServerBuilder;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -433,6 +435,33 @@ public void testBuilderParametersAndDefaults() {
assertEquals(Publisher.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings);
}

@Test
public void testTransformer() throws Exception {
final AtomicBoolean transformed = new AtomicBoolean();
ApiFunction<PubsubMessage, PubsubMessage> transform =
new ApiFunction<PubsubMessage, PubsubMessage>() {
@Override
public PubsubMessage apply(PubsubMessage input) {
transformed.set(true);
return input;
}
};
Publisher publisher =
getTestPublisherBuilder()
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
.toBuilder()
.setElementCountThreshold(1L)
.setDelayThreshold(Duration.ofSeconds(100))
.build())
.setTransform(transform)
.build();
publisher.publish(PubsubMessage.getDefaultInstance());
testPublisherServiceImpl.addPublishResponse(PublishResponse.getDefaultInstance());
publisher.shutdown();
assertTrue(transformed.get());
}

@Test
public void testBuilderInvalidArguments() {
Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);
Expand Down