Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import com.google.pubsub.v1.SubscriptionName;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
Expand Down Expand Up @@ -105,6 +106,7 @@ class MessageDispatcher {
private final Distribution ackLatencyDistribution;

private final String subscriptionName;
private final SubscriptionName subscriptionNameObject;
private final boolean enableOpenTelemetryTracing;
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);

Expand Down Expand Up @@ -226,6 +228,7 @@ private MessageDispatcher(Builder builder) {
sequentialExecutor = new SequentialExecutorService.AutoExecutor(builder.executor);

subscriptionName = builder.subscriptionName;
subscriptionNameObject = SubscriptionName.parse(builder.subscriptionName);
enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
if (builder.tracer != null) {
tracer = builder.tracer;
Expand Down Expand Up @@ -408,7 +411,7 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
message.getMessage(),
subscriptionName,
subscriptionNameObject,
message.getAckId(),
message.getDeliveryAttempt())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class Publisher implements PublisherInterface {

private final String topicName;
private final int topicNameSize;
private final TopicName topicNameObject;

private final BatchingSettings batchingSettings;
private final boolean enableMessageOrdering;
Expand Down Expand Up @@ -149,6 +150,7 @@ private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;
topicNameSize =
CodedOutputStream.computeStringSize(PublishRequest.TOPIC_FIELD_NUMBER, this.topicName);
topicNameObject = TopicName.parse(this.topicName);

this.batchingSettings = builder.batchingSettings;
FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings();
Expand Down Expand Up @@ -282,7 +284,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
+ "setEnableMessageOrdering(true) in the builder.");

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(messageTransform.apply(message), topicName).build();
PubsubMessageWrapper.newBuilder(messageTransform.apply(message), topicNameObject).build();
tracer.startPublisherSpan(messageWrapper);

final OutstandingPublish outstandingPublish = new OutstandingPublish(messageWrapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ private PubsubMessageWrapper(Builder builder) {
this.deliveryAttempt = builder.deliveryAttempt;
}

static Builder newBuilder(PubsubMessage message, String topicName) {
static Builder newBuilder(PubsubMessage message, TopicName topicName) {
return new Builder(message, topicName);
}

static Builder newBuilder(
PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) {
PubsubMessage message, SubscriptionName subscriptionName, String ackId, int deliveryAttempt) {
return new Builder(message, subscriptionName, ackId, deliveryAttempt);
}

Expand Down Expand Up @@ -395,21 +395,9 @@ static final class Builder {
private String ackId = null;
private int deliveryAttempt = 0;

public Builder(PubsubMessage message, String topicName) {
public Builder(PubsubMessage message, TopicName topicName) {
this.message = message;
if (topicName != null) {
this.topicName = TopicName.parse(topicName);
}
}

public Builder(
PubsubMessage message, String subscriptionName, String ackId, int deliveryAttempt) {
this.message = message;
if (subscriptionName != null) {
this.subscriptionName = SubscriptionName.parse(subscriptionName);
}
this.ackId = ackId;
this.deliveryAttempt = deliveryAttempt;
this.topicName = topicName;
}

public Builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testPublishSpansSuccess() {
openTelemetryTesting.clearSpans();

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build();
List<PubsubMessageWrapper> messageWrappers = Arrays.asList(messageWrapper);

long messageSize = messageWrapper.getPubsubMessage().getData().size();
Expand Down Expand Up @@ -218,7 +218,7 @@ public void testPublishFlowControlSpanFailure() {
openTelemetryTesting.clearSpans();

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build();

Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);
Expand Down Expand Up @@ -258,7 +258,7 @@ public void testPublishRpcSpanFailure() {
openTelemetryTesting.clearSpans();

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build();

List<PubsubMessageWrapper> messageWrappers = Arrays.asList(messageWrapper);
Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
Expand Down Expand Up @@ -302,16 +302,15 @@ public void testSubscribeSpansSuccess() {
OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(openTelemetryTracer, true);

PubsubMessageWrapper publishMessageWrapper =
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME.toString()).build();
PubsubMessageWrapper.newBuilder(getPubsubMessage(), FULL_TOPIC_NAME).build();
// Initialize the Publisher span to inject the context in the message
tracer.startPublisherSpan(publishMessageWrapper);
tracer.endPublisherSpan(publishMessageWrapper);

PubsubMessage publishedMessage =
publishMessageWrapper.getPubsubMessage().toBuilder().setMessageId(MESSAGE_ID).build();
PubsubMessageWrapper subscribeMessageWrapper =
PubsubMessageWrapper.newBuilder(
publishedMessage, FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, 1)
PubsubMessageWrapper.newBuilder(publishedMessage, FULL_SUBSCRIPTION_NAME, ACK_ID, 1)
.build();
List<PubsubMessageWrapper> subscribeMessageWrappers = Arrays.asList(subscribeMessageWrapper);

Expand Down Expand Up @@ -518,7 +517,7 @@ public void testSubscribeConcurrencyControlSpanFailure() {

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT)
getPubsubMessage(), FULL_SUBSCRIPTION_NAME, ACK_ID, DELIVERY_ATTEMPT)
.build();

Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
Expand Down Expand Up @@ -562,7 +561,7 @@ public void testSubscriberSpanFailure() {

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT)
getPubsubMessage(), FULL_SUBSCRIPTION_NAME, ACK_ID, DELIVERY_ATTEMPT)
.build();

Tracer openTelemetryTracer = openTelemetryTesting.getOpenTelemetry().getTracer("test");
Expand Down Expand Up @@ -595,7 +594,7 @@ public void testSubscribeRpcSpanFailures() {

PubsubMessageWrapper messageWrapper =
PubsubMessageWrapper.newBuilder(
getPubsubMessage(), FULL_SUBSCRIPTION_NAME.toString(), ACK_ID, DELIVERY_ATTEMPT)
getPubsubMessage(), FULL_SUBSCRIPTION_NAME, ACK_ID, DELIVERY_ATTEMPT)
.build();
List<PubsubMessageWrapper> messageWrappers = Arrays.asList(messageWrapper);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pubsub;

// [START pubsub_create_topic_with_aws_msk_ingestion]

import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithAwsMskIngestionExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
// AWS MSK ingestion settings.
String clusterArn = "cluster-arn";
String mskTopic = "msk-topic";
String awsRoleArn = "aws-role-arn";
String gcpServiceAccount = "gcp-service-account";

createTopicWithAwsMskIngestionExample(
projectId, topicId, clusterArn, mskTopic, awsRoleArn, gcpServiceAccount);
}

public static void createTopicWithAwsMskIngestionExample(
String projectId,
String topicId,
String clusterArn,
String mskTopic,
String awsRoleArn,
String gcpServiceAccount)
throws IOException {
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);

IngestionDataSourceSettings.AwsMsk awsMsk =
IngestionDataSourceSettings.AwsMsk.newBuilder()
.setClusterArn(clusterArn)
.setTopic(mskTopic)
.setAwsRoleArn(awsRoleArn)
.setGcpServiceAccount(gcpServiceAccount)
.build();
IngestionDataSourceSettings ingestionDataSourceSettings =
IngestionDataSourceSettings.newBuilder().setAwsMsk(awsMsk).build();

Topic topic =
topicAdminClient.createTopic(
Topic.newBuilder()
.setName(topicName.toString())
.setIngestionDataSourceSettings(ingestionDataSourceSettings)
.build());

System.out.println("Created topic with AWS MSK ingestion settings: " + topic.getAllFields());
}
}
}
// [END pubsub_create_topic_with_aws_msk_ingestion]
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package pubsub;

// [START pubsub_create_topic_with_azure_event_hubs_ingestion]

import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;

public class CreateTopicWithAzureEventHubsIngestionExample {
public static void main(String... args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String projectId = "your-project-id";
String topicId = "your-topic-id";
// Azure Event Hubs ingestion settings.
String resourceGroup = "resource-group";
String namespace = "namespace";
String eventHub = "event-hub";
String clientId = "client-id";
String tenantId = "tenant-id";
String subscriptionId = "subscription-id";
String gcpServiceAccount = "gcp-service-account";

createTopicWithAzureEventHubsIngestionExample(
projectId,
topicId,
resourceGroup,
namespace,
eventHub,
clientId,
tenantId,
subscriptionId,
gcpServiceAccount);
}

public static void createTopicWithAzureEventHubsIngestionExample(
String projectId,
String topicId,
String resourceGroup,
String namespace,
String eventHub,
String clientId,
String tenantId,
String subscriptionId,
String gcpServiceAccount)
throws IOException {
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
TopicName topicName = TopicName.of(projectId, topicId);

IngestionDataSourceSettings.AzureEventHubs azureEventHubs =
IngestionDataSourceSettings.AzureEventHubs.newBuilder()
.setResourceGroup(resourceGroup)
.setNamespace(namespace)
.setEventHub(eventHub)
.setClientId(clientId)
.setTenantId(tenantId)
.setSubscriptionId(subscriptionId)
.setGcpServiceAccount(gcpServiceAccount)
.build();
IngestionDataSourceSettings ingestionDataSourceSettings =
IngestionDataSourceSettings.newBuilder().setAzureEventHubs(azureEventHubs).build();

Topic topic =
topicAdminClient.createTopic(
Topic.newBuilder()
.setName(topicName.toString())
.setIngestionDataSourceSettings(ingestionDataSourceSettings)
.build());

System.out.println(
"Created topic with Azure Event Hubs ingestion settings: " + topic.getAllFields());
}
}
}
// [END pubsub_create_topic_with_azure_event_hubs_ingestion]
Loading
Loading