From 442f09b6c7b1c498aff79922359e9a68740186d7 Mon Sep 17 00:00:00 2001 From: Solomon Duskis Date: Thu, 28 Feb 2019 18:38:08 -0500 Subject: [PATCH] Adding a pubsub publisher message transformer This will allow generic transformations of messages before a message is published. --- .../com/google/cloud/pubsub/v1/Publisher.java | 22 ++++++++++++++ .../cloud/pubsub/v1/PublisherImplTest.java | 29 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java index 557a483073de..ac0074aa48c6 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub.v1; +import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; @@ -95,6 +96,7 @@ public class Publisher { private final List closeables; private final MessageWaiter messagesWaiter; private ScheduledFuture currentAlarmFuture; + private final ApiFunction messageTransform; /** The maximum number of messages in one request. Defined by the API. */ public static long getApiMaxRequestElementCount() { @@ -110,6 +112,7 @@ private Publisher(Builder builder) throws IOException { topicName = builder.topicName; this.batchingSettings = builder.batchingSettings; + this.messageTransform = builder.messageTransform; messagesBatch = new LinkedList<>(); messagesBatchLock = new ReentrantLock(); @@ -192,6 +195,7 @@ public ApiFuture publish(PubsubMessage message) { throw new IllegalStateException("Cannot publish on a shut-down publisher."); } + message = messageTransform.apply(message); final int messageSize = message.getSerializedSize(); OutstandingBatch batchToSend = null; SettableApiFuture publishResult = SettableApiFuture.create(); @@ -528,6 +532,14 @@ public static final class Builder { CredentialsProvider credentialsProvider = TopicAdminSettings.defaultCredentialsProviderBuilder().build(); + ApiFunction messageTransform = + new ApiFunction() { + @Override + public PubsubMessage apply(PubsubMessage input) { + return input; + } + }; + private Builder(String topic) { this.topicName = Preconditions.checkNotNull(topic); } @@ -610,6 +622,16 @@ public Builder setExecutorProvider(ExecutorProvider executorProvider) { return this; } + /** + * Gives the ability to set an {@link ApiFunction} that will transform the {@link PubsubMessage} + * before it is sent + */ + public Builder setTransform(ApiFunction messageTransform) { + this.messageTransform = + Preconditions.checkNotNull(messageTransform, "The messageTransform cannnot be null."); + return this; + } + public Publisher build() throws IOException { return new Publisher(this); } diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java index 2901ae4b0a9d..c4b2eaac855d 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import com.google.api.core.ApiFunction; import com.google.api.core.ApiFuture; import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.ExecutorProvider; @@ -43,6 +44,7 @@ import io.grpc.inprocess.InProcessServerBuilder; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -433,6 +435,33 @@ public void testBuilderParametersAndDefaults() { assertEquals(Publisher.Builder.DEFAULT_RETRY_SETTINGS, builder.retrySettings); } + @Test + public void testTransformer() throws Exception { + final AtomicBoolean transformed = new AtomicBoolean(); + ApiFunction transform = + new ApiFunction() { + @Override + public PubsubMessage apply(PubsubMessage input) { + transformed.set(true); + return input; + } + }; + Publisher publisher = + getTestPublisherBuilder() + .setBatchingSettings( + Publisher.Builder.DEFAULT_BATCHING_SETTINGS + .toBuilder() + .setElementCountThreshold(1L) + .setDelayThreshold(Duration.ofSeconds(100)) + .build()) + .setTransform(transform) + .build(); + publisher.publish(PubsubMessage.getDefaultInstance()); + testPublisherServiceImpl.addPublishResponse(PublishResponse.getDefaultInstance()); + publisher.shutdown(); + assertTrue(transformed.get()); + } + @Test public void testBuilderInvalidArguments() { Publisher.Builder builder = Publisher.newBuilder(TEST_TOPIC);