Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,6 @@
* pull sync <subscription> <maxMessages>
* publish <topic> <message>+
* replace-push-config <subscription> <endpoint>?
* ack <subscription> <ackId>+
* nack <subscription> <ackId>+
* create topic <topic>
* create subscription <topic> <subscription> <endpoint>?
* list subscriptions <topic>?
Expand Down Expand Up @@ -488,22 +486,6 @@ public String params() {
}
}

private abstract static class MessagesAction extends PubSubAction<Tuple<SubscriptionName, List<String>>> {
@Override
Tuple<SubscriptionName, List<String>> parse(String... args) throws Exception {
if (args.length < 2) {
throw new IllegalArgumentException("Missing required subscription and ack IDs");
}
SubscriptionName subscriptionName = SubscriptionName.create(projectId, args[0]);
return Tuple.of(subscriptionName, Arrays.asList(Arrays.copyOfRange(args, 1, args.length)));
}

@Override
public String params() {
return "<subscription> <ackId>+";
}
}

/**
* This class demonstrates how to asynchronously pull messages from a Pub/Sub pull subscription.
* Messages are pulled until a timeout is reached.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public class CreateSubscriptionAndPullMessages {

public static void main(String... args) throws Exception {
// [START async_pull_subscription]
TopicName topic = TopicName.create("test-project", "test-topic");
SubscriptionName subscription = SubscriptionName.create("test-project", "test-subscription");

Expand Down Expand Up @@ -62,11 +63,13 @@ public void failed(Subscriber.State from, Throwable failure) {
},
MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();

Thread.sleep(60000);
} finally {
if (subscriber != null) {
subscriber.stopAsync();
}
}
// [END async_pull_subscription]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,35 +53,61 @@ public String getProjectId() {
}

/** Example of creating a pull subscription for a topic. */
public Subscription createSubscription(String topic, String subscriptionId) throws Exception {
public Subscription createSubscription(String topicId, String subscriptionId) throws Exception {
// [START createSubscription]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
// [START createSubscription]
TopicName topicName = TopicName.create(projectId, topic);
// eg. projectId = "my-test-project", topicId = "my-test-topic"
TopicName topicName = TopicName.create(projectId, topicId);
// eg. subscriptionId = "my-test-subscription"
SubscriptionName subscriptionName =
SubscriptionName.create(projectId, subscriptionId);
// create a pull subscription with default acknowledgement deadline
Subscription subscription =
subscriptionAdminClient.createSubscription(
subscriptionName, topicName, PushConfig.getDefaultInstance(), 0);
// [END createSubscription]
return subscription;
}
// [END createSubscription]
}

/** Example of creating a subscription with a push endpoint. */
public Subscription createSubscriptionWithPushEndpoint(String topicId, String subscriptionId, String endpoint)
throws Exception {
// [START createSubscriptionWithPushEndpoint]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
TopicName topicName = TopicName.create(projectId, topicId);
SubscriptionName subscriptionName =
SubscriptionName.create(projectId, subscriptionId);

// eg. endpoint = "https://my-test-project.appspot.com/push"
PushConfig pushConfig = PushConfig.newBuilder().setPushEndpoint(endpoint).build();

// acknowledgement deadline in seconds for the message received over the push endpoint
int ackDeadlineInSeconds = 10;

Subscription subscription =
subscriptionAdminClient.createSubscription(
subscriptionName, topicName, pushConfig, ackDeadlineInSeconds);
return subscription;
}
// [END createSubscriptionWithPushEndpoint]
}

/** Example of replacing the push configuration of a subscription, setting the push endpoint. */
public void replacePushConfig(String subscriptionId, String endpoint) throws Exception {
// [START replacePushConfig]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
// [START replacePushConfig]
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
PushConfig pushConfig = PushConfig.newBuilder().setPushEndpoint(endpoint).build();
subscriptionAdminClient.modifyPushConfig(subscriptionName, pushConfig);
// [END replacePushConfig]
}
// [END replacePushConfig]
}

/** Example of listing subscriptions. */
public ListSubscriptionsPagedResponse listSubscriptions() throws Exception {
// [START listSubscriptions]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
// [START listSubscriptions]
ListSubscriptionsRequest listSubscriptionsRequest =
ListSubscriptionsRequest.newBuilder()
.setProjectWithProjectName(ProjectName.create(projectId))
Expand All @@ -92,40 +118,40 @@ public ListSubscriptionsPagedResponse listSubscriptions() throws Exception {
for (Subscription subscription : subscriptions) {
// do something with the subscription
}
// [END listSubscriptions]
return response;
}
// [END listSubscriptions]
}

/** Example of deleting a subscription. */
public SubscriptionName deleteSubscription(String subscriptionId) throws Exception {
// [START deleteSubscription]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
// [START deleteSubscription]
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
subscriptionAdminClient.deleteSubscription(subscriptionName);
// [END deleteSubscription]
return subscriptionName;
}
// [END deleteSubscription]
}

/** Example of getting a subscription policy. */
public Policy getSubscriptionPolicy(String subscriptionId) throws Exception {
// [START getSubscriptionPolicy]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
// [START getSubscriptionPolicy]
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
Policy policy = subscriptionAdminClient.getIamPolicy(subscriptionName.toString());
if (policy == null) {
// subscription was not found
}
// [END getSubscriptionPolicy]
return policy;
}
// [END getSubscriptionPolicy]
}

/** Example of replacing a subscription policy. */
public Policy replaceSubscriptionPolicy(String subscriptionId) throws Exception {
// [START replaceSubscriptionPolicy]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
// [START replaceSubscriptionPolicy]
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
Policy policy = subscriptionAdminClient.getIamPolicy(subscriptionName.toString());
// Create a role => members binding
Expand All @@ -138,34 +164,34 @@ public Policy replaceSubscriptionPolicy(String subscriptionId) throws Exception
Policy updatedPolicy = policy.toBuilder().addBindings(binding).build();

updatedPolicy = subscriptionAdminClient.setIamPolicy(subscriptionName.toString(), updatedPolicy);
// [END replaceSubscriptionPolicy]
return updatedPolicy;
}
// [END replaceSubscriptionPolicy]
}

/** Example of testing whether the caller has the provided permissions on a subscription. */
public TestIamPermissionsResponse testSubscriptionPermissions(String subscriptionId)
throws Exception {
// [START testSubscriptionPermissions]
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
// [START testSubscriptionPermissions]
List<String> permissions = new LinkedList<>();
permissions.add("pubsub.subscriptions.get");
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
TestIamPermissionsResponse testedPermissions =
topicAdminClient.testIamPermissions(subscriptionName.toString(), permissions);
// [END testSubscriptionPermissions]
return testedPermissions;
}
// [END testSubscriptionPermissions]
}

/** Example of getting a subscription. */
public Subscription getSubscription(String subscriptionId) throws Exception {
// [START getSubscription]
try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) {
// [START getSubscription]
SubscriptionName subscriptionName = SubscriptionName.create(projectId, subscriptionId);
Subscription subscription = subscriptionAdminClient.getSubscription(subscriptionName);
// [END getSubscription]
return subscription;
}
// [END getSubscription]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,19 @@ public String getProjectId() {

/** Example of creating a topic. */
public Topic createTopic(String topicId) throws Exception {
// [START createTopic]
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
// [START createTopic]
TopicName topicName = TopicName.create(projectId, topicId);
Topic topic = topicAdminClient.createTopic(topicName);
// [END createTopic]
return topic;
}
// [END createTopic]
}

/** Example of listing topics. */
public ListTopicsPagedResponse listTopics() throws Exception {
// [START listTopics]
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
// [START listTopics]
ListTopicsRequest listTopicsRequest =
ListTopicsRequest.newBuilder()
.setProjectWithProjectName(ProjectName.create(projectId))
Expand All @@ -70,16 +70,16 @@ public ListTopicsPagedResponse listTopics() throws Exception {
for (Topic topic : topics) {
// do something with the topic
}
// [END listTopics]
return response;
}
// [END listTopics]
}

/** Example of listing topics for a subscription. */
public ListTopicSubscriptionsPagedResponse listTopicSubscriptions(String topicId)
throws Exception {
// [START listTopicSubscriptions]
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
// [START listTopicSubscriptions]
TopicName topicName = TopicName.create(projectId, topicId);
ListTopicSubscriptionsRequest request =
ListTopicSubscriptionsRequest.newBuilder()
Expand All @@ -91,40 +91,40 @@ public ListTopicSubscriptionsPagedResponse listTopicSubscriptions(String topicId
for (String subscriptionName : subscriptionNames) {
// do something with the subscription name
}
// [END listTopicSubscriptions]
return response;
}
// [END listTopicSubscriptions]
}

/** Example of deleting a topic. */
public TopicName deleteTopic(String topicId) throws Exception {
// [START deleteTopic]
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
// [START deleteTopic]
TopicName topicName = TopicName.create(projectId, topicId);
topicAdminClient.deleteTopic(topicName);
// [END deleteTopic]
return topicName;
}
// [END deleteTopic]
}

/** Example of getting a topic policy. */
public Policy getTopicPolicy(String topicId) throws Exception {
// [START getTopicPolicy]
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
// [START getTopicPolicy]
TopicName topicName = TopicName.create(projectId, topicId);
Policy policy = topicAdminClient.getIamPolicy(topicName.toString());
if (policy == null) {
// topic iam policy was not found
}
// [END getTopicPolicy]
return policy;
}
// [END getTopicPolicy]
}

/** Example of replacing a topic policy. */
public Policy replaceTopicPolicy(String topicId) throws Exception {
// [START replaceTopicPolicy]
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
// [START replaceTopicPolicy]
String topicName = TopicName.create(projectId, topicId).toString();
Policy policy = topicAdminClient.getIamPolicy(topicName);
// add role -> members binding
Expand All @@ -136,34 +136,34 @@ public Policy replaceTopicPolicy(String topicId) throws Exception {
// create updated policy
Policy updatedPolicy = Policy.newBuilder(policy).addBindings(binding).build();
updatedPolicy = topicAdminClient.setIamPolicy(topicName, updatedPolicy);
// [END replaceTopicPolicy]
return updatedPolicy;
}
// [END replaceTopicPolicy]
}

/** Example of testing whether the caller has the provided permissions on a topic.
* Only viewer, editor or admin/owner can view results of pubsub.topics.get */
public TestIamPermissionsResponse testTopicPermissions(String topicId) throws Exception {
// [START testTopicPermissions]
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
// [START testTopicPermissions]
List<String> permissions = new LinkedList<>();
permissions.add("pubsub.topics.get");
TopicName topicName = TopicName.create(projectId, topicId);
TestIamPermissionsResponse testedPermissions =
topicAdminClient.testIamPermissions(topicName.toString(), permissions);
// [END testTopicPermissions]
return testedPermissions;
}
// [END testTopicPermissions]
}

/** Example of getting a topic. */
public Topic getTopic(String topicId) throws Exception {
// [START getTopic]
try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
// [START getTopic]
TopicName topicName = TopicName.create(projectId, topicId);
Topic topic = topicAdminClient.getTopic(topicName);
// [END createTopic]
return topic;
}
// [END getTopic]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,19 @@ public void setUp() throws Exception {
Cleanup.deleteTestTopicsAndSubscriptions(projectId, topics, subscriptions);
}

private Subscription createSubscription(String topicName, String subscriptionName)
throws Exception {
@Test
public void createSubscriptionWithPushIsSuccessful() throws Exception {
String topicName = topics[0];
String subscriptionName = subscriptions[0];
createTopic(topicName);
String endpoint = "https://" + projectId + ".appspot.com/push";
Subscription subscription =
subscriptionAdminClientSnippets.createSubscription(topicName, subscriptionName);
subscriptionAdminClientSnippets.createSubscriptionWithPushEndpoint(topicName, subscriptionName, endpoint);
assertNotNull(subscription);
Subscription retrievedSubscription = subscriptionAdminClientSnippets.getSubscription(subscriptionName);
assertNotNull(retrievedSubscription);
assertEquals(subscription.getName(), retrievedSubscription.getName());
return subscription;
assertEquals(subscription.getPushConfig().getPushEndpoint(), endpoint);
}

@Test
Expand Down Expand Up @@ -168,17 +171,16 @@ private void createTopic(String name) throws Exception {
}
}

private Set<String> publishMessages(String topicName, int numMessages) throws Exception {
Set<String> messages = new HashSet<>();
Publisher publisher = Publisher.defaultBuilder(TopicName.create(projectId, topicName)).build();
for (int i = 1; i<= numMessages; i++) {
String message = formatForTest("message-" + i);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(
ByteString.copyFromUtf8(message)).build();
publisher.publish(pubsubMessage);
messages.add(message);
}
return messages;
private Subscription createSubscription(String topicName, String subscriptionName)
throws Exception {
createTopic(topicName);
Subscription subscription =
subscriptionAdminClientSnippets.createSubscription(topicName, subscriptionName);
assertNotNull(subscription);
Subscription retrievedSubscription = subscriptionAdminClientSnippets.getSubscription(subscriptionName);
assertNotNull(retrievedSubscription);
assertEquals(subscription.getName(), retrievedSubscription.getName());
return subscription;
}

@After
Expand Down