diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java
index 65ed737fc0cd..a2b348f4e91d 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/Topic.java
@@ -30,7 +30,13 @@
import java.util.concurrent.Future;
/**
- * PubSub Topic.
+ * A Google Cloud Pub/Sub topic. A topic is a named resource to which messages are sent by
+ * publishers. Subscribers can receive messages sent to a topic by creating subscriptions.
+ * {@code Topic} adds a layer of service-related functionality over {@link TopicInfo}. Objects of
+ * this class are immutable. To get a {@code Topic} object with the most recent information use
+ * {@link #reload} or {@link #reloadAsync}.
+ *
+ * @see Pub/Sub Data Model
*/
public class Topic extends TopicInfo {
@@ -39,6 +45,9 @@ public class Topic extends TopicInfo {
private final PubSubOptions options;
private transient PubSub pubsub;
+ /**
+ * A builder for {@code Topic} objects.
+ */
public static final class Builder extends TopicInfo.Builder {
private final PubSub pubsub;
@@ -73,70 +82,173 @@ public Builder toBuilder() {
}
@Override
- public int hashCode() {
+ public final int hashCode() {
return Objects.hash(options, super.hashCode());
}
@Override
- public boolean equals(Object obj) {
- if (this == obj) {
+ public final boolean equals(Object obj) {
+ if (obj == this) {
return true;
}
- if (obj == null || getClass() != obj.getClass()) {
+ if (obj == null || !obj.getClass().equals(Topic.class)) {
return false;
}
Topic other = (Topic) obj;
- return Objects.equals(name(), other.name()) && Objects.equals(options, other.options);
+ return baseEquals(other) && Objects.equals(options, other.options);
}
+ /**
+ * Returns the topic's {@code PubSub} object used to issue requests.
+ */
public PubSub pubSub() {
return pubsub;
}
+ /**
+ * Deletes this topic.
+ *
+ * @return {@code true} if the topic was deleted, {@code false} if it was not found
+ * @throws PubSubException upon failure
+ */
public boolean delete() {
return pubsub.deleteTopic(name());
}
+ /**
+ * Sends a request for deleting this topic. This method returns a {@code Future} object to consume
+ * the result. {@link Future#get()} returns {@code true} if the topic was deleted, {@code false}
+ * if it was not found.
+ *
+ * @throws PubSubException upon failure
+ */
public Future deleteAsync() {
return pubsub.deleteTopicAsync(name());
}
+ /**
+ * Fetches current topic's latest information. Returns {@code null} if the topic does not exist.
+ *
+ * @return a {@code Topic} object with latest information or {@code null} if not found
+ * @throws PubSubException upon failure
+ */
public Topic reload() {
return pubsub.getTopic(name());
}
+ /**
+ * Sends a request to fetch current topic's latest information. This method returns a
+ * {@code Future} object to consume the result. {@link Future#get()} returns a {@code Topic}
+ * object with latest information or {@code null} if not found.
+ *
+ * @throws PubSubException upon failure
+ */
public Future reloadAsync() {
return pubsub.getTopicAsync(name());
}
+ /**
+ * Publishes a message to this topic. This method returns a service-generated id for the published
+ * message. Service-generated ids are guaranteed to be unique within the topic.
+ *
+ * @param message the message to publish
+ * @return a unique service-generated id for the message
+ * @throws PubSubException upon failure, if the topic does not exist or if the message has empty
+ * payload and no attributes
+ */
public String publish(Message message) {
return pubsub.publish(name(), message);
}
+ /**
+ * Sends a request for publishing a message to the this topic. This method returns a
+ * {@code Future} object to consume the result. {@link Future#get()} returns a service-generated
+ * id for the published message. Service-generated ids are guaranteed to be unique within the
+ * topic.
+ *
+ * @param message the message to publish
+ * @return a {@code Future} for the unique service-generated id for the message
+ */
public Future publishAsync(Message message) {
return pubsub.publishAsync(name(), message);
}
+ /**
+ * Publishes a number of messages to this topic. This method returns a list of service-generated
+ * ids for the published messages. Service-generated ids are guaranteed to be unique within the
+ * topic.
+ *
+ * @param message the first message to publish
+ * @param messages other messages to publish
+ * @return a list of unique, service-generated, ids. Ids are in the same order as the messages.
+ * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has
+ * empty payload and no attributes
+ */
public List publish(Message message, Message... messages) {
return pubsub.publish(name(), message, messages);
}
+ /**
+ * Sends a request to publish a number of messages to this topic. This method returns a
+ * {@code Future} object to consume the result. {@link Future#get()} returns a list of
+ * service-generated ids for the published messages. Service-generated ids are guaranteed to be
+ * unique within the topic.
+ *
+ * @param message the first message to publish
+ * @param messages other messages to publish
+ * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as
+ * the messages.
+ */
public Future> publishAsync(Message message, Message... messages) {
return pubsub.publishAsync(name(), message, messages);
}
+ /**
+ * Publishes a number of messages to this topic. This method returns a list ofservice-generated
+ * ids for the published messages. Service-generated ids are guaranteed to be unique within the
+ * topic.
+ *
+ * @param messages the messages to publish
+ * @return a list of unique, service-generated, ids. Ids are in the same order as the messages.
+ * @throws PubSubException upon failure, if the topic does not exist or if one of the messages has
+ * empty payload and no attributes
+ */
public List publish(Iterable messages) {
return pubsub.publish(name(), messages);
}
+ /**
+ * Sends a request to publish a number of messages to this topic. This method returns a
+ * {@code Future} object to consume the result. {@link Future#get()} returns a list of
+ * service-generated ids for the published messages. Service-generated ids are guaranteed to be
+ * unique within the topic.
+ *
+ * @param messages the messages to publish
+ * @return a {@code Future} for the unique, service-generated ids. Ids are in the same order as
+ * the messages.
+ */
public Future> publishAsync(Iterable messages) {
return pubsub.publishAsync(name(), messages);
}
+ /**
+ * Lists the identities of the subscriptions for this topic. This method returns a {@link Page}
+ * object that can be used to consume paginated results. Use {@link ListOption} to specify the
+ * page size or the page token from which to start listing subscriptions.
+ *
+ * @throws PubSubException upon failure
+ */
public Page listSubscriptions(ListOption... options) {
return pubsub.listSubscriptions(name(), options);
}
+ /**
+ * Sends a request for listing the identities of subscriptions for this topic. This method returns
+ * a {@code Future} object to consume the result. {@link Future#get()} returns an
+ * {@link AsyncPage} object that can be used to asynchronously handle paginated results. Use
+ * {@link ListOption} to specify the page size or the page token from which to start listing
+ * subscriptions.
+ */
public Future> listSubscriptionsAsync(ListOption... options) {
return pubsub.listSubscriptionsAsync(name(), options);
}
@@ -146,9 +258,9 @@ private void readObject(ObjectInputStream input) throws IOException, ClassNotFou
this.pubsub = options.service();
}
- static Topic fromPb(PubSub storage, com.google.pubsub.v1.Topic topicPb) {
+ static Topic fromPb(PubSub pubsub, com.google.pubsub.v1.Topic topicPb) {
TopicInfo topicInfo = TopicInfo.fromPb(topicPb);
- return new Topic(storage, new BuilderImpl(topicInfo));
+ return new Topic(pubsub, new BuilderImpl(topicInfo));
}
static Function fromPbFunction(final PubSub pubsub) {
diff --git a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java
index 8c90a0c2705d..e1b2dc6275a0 100644
--- a/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java
+++ b/gcloud-java-pubsub/src/main/java/com/google/cloud/pubsub/TopicInfo.java
@@ -98,15 +98,16 @@ public int hashCode() {
return Objects.hash(name);
}
+ final boolean baseEquals(TopicInfo topicInfo) {
+ return Objects.equals(name, topicInfo.name);
+ }
+
@Override
public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || !obj.getClass().equals(this.getClass())) {
- return false;
- }
- return Objects.equals(name, ((TopicInfo) obj).name);
+ return obj == this
+ || obj != null
+ && obj.getClass().equals(TopicInfo.class)
+ && baseEquals((TopicInfo) obj);
}
@Override
diff --git a/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java
new file mode 100644
index 000000000000..911c6ec0d627
--- /dev/null
+++ b/gcloud-java-pubsub/src/test/java/com/google/cloud/pubsub/TopicTest.java
@@ -0,0 +1,320 @@
+/*
+ * Copyright 2016 Google Inc. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.google.cloud.pubsub;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.createStrictMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.cloud.AsyncPage;
+import com.google.cloud.AsyncPageImpl;
+import com.google.cloud.Page;
+import com.google.cloud.PageImpl;
+import com.google.cloud.pubsub.PubSub.ListOption;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.Futures;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+public class TopicTest {
+
+ private static final String NAME = "topic";
+ private static final TopicInfo TOPIC_INFO = TopicInfo.of(NAME);
+
+ private final PubSub serviceMockReturnsOptions = createStrictMock(PubSub.class);
+ private final PubSubOptions mockOptions = createMock(PubSubOptions.class);
+ private PubSub pubsub;
+ private Topic expectedTopic;
+ private Topic topic;
+
+ private void initializeExpectedTopic(int optionsCalls) {
+ expect(serviceMockReturnsOptions.options()).andReturn(mockOptions).times(optionsCalls);
+ replay(serviceMockReturnsOptions);
+ pubsub = createStrictMock(PubSub.class);
+ expectedTopic = new Topic(serviceMockReturnsOptions, new Topic.BuilderImpl(TOPIC_INFO));
+ }
+
+ private void initializeTopic() {
+ topic = new Topic(pubsub, new Topic.BuilderImpl(TOPIC_INFO));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ verify(pubsub, serviceMockReturnsOptions);
+ }
+
+ @Test
+ public void testBuilder() {
+ initializeExpectedTopic(2);
+ replay(pubsub);
+ Topic builtTopic = expectedTopic.toBuilder().name("newTopic").build();
+ assertEquals("newTopic", builtTopic.name());
+ }
+
+ @Test
+ public void testToBuilder() {
+ initializeExpectedTopic(2);
+ replay(pubsub);
+ compareTopic(expectedTopic, expectedTopic.toBuilder().build());
+ }
+
+ @Test
+ public void testReload() {
+ initializeExpectedTopic(2);
+ TopicInfo updatedInfo = TOPIC_INFO.toBuilder().name("newTopic").build();
+ Topic expectedTopic =
+ new Topic(serviceMockReturnsOptions, new TopicInfo.BuilderImpl(updatedInfo));
+ expect(pubsub.options()).andReturn(mockOptions);
+ expect(pubsub.getTopic(NAME)).andReturn(expectedTopic);
+ replay(pubsub);
+ initializeTopic();
+ Topic updatedTopic = topic.reload();
+ compareTopic(expectedTopic, updatedTopic);
+ }
+
+ @Test
+ public void testReloadNull() {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ expect(pubsub.getTopic(NAME)).andReturn(null);
+ replay(pubsub);
+ initializeTopic();
+ assertNull(topic.reload());
+ }
+
+ @Test
+ public void testReloadAsync() throws ExecutionException, InterruptedException {
+ initializeExpectedTopic(2);
+ TopicInfo updatedInfo = TOPIC_INFO.toBuilder().name("newTopic").build();
+ Topic expectedTopic =
+ new Topic(serviceMockReturnsOptions, new TopicInfo.BuilderImpl(updatedInfo));
+ expect(pubsub.options()).andReturn(mockOptions);
+ expect(pubsub.getTopicAsync(NAME))
+ .andReturn(Futures.immediateFuture(expectedTopic));
+ replay(pubsub);
+ initializeTopic();
+ Topic updatedTopic = topic.reloadAsync().get();
+ compareTopic(expectedTopic, updatedTopic);
+ }
+
+ @Test
+ public void testReloadAsyncNull() throws ExecutionException, InterruptedException {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ expect(pubsub.getTopicAsync(NAME)).andReturn(Futures.immediateFuture(null));
+ replay(pubsub);
+ initializeTopic();
+ assertNull(topic.reloadAsync().get());
+ }
+
+ @Test
+ public void testDeleteTrue() {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ expect(pubsub.deleteTopic(NAME)).andReturn(true);
+ replay(pubsub);
+ initializeTopic();
+ assertTrue(topic.delete());
+ }
+
+ @Test
+ public void testDeleteFalse() {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ expect(pubsub.deleteTopic(NAME)).andReturn(false);
+ replay(pubsub);
+ initializeTopic();
+ assertFalse(topic.delete());
+ }
+
+ @Test
+ public void testDeleteAsyncTrue() throws ExecutionException, InterruptedException {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ expect(pubsub.deleteTopicAsync(NAME)).andReturn(Futures.immediateFuture(true));
+ replay(pubsub);
+ initializeTopic();
+ assertTrue(topic.deleteAsync().get());
+ }
+
+ @Test
+ public void testDeleteAsyncFalse() throws ExecutionException, InterruptedException {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ expect(pubsub.deleteTopicAsync(NAME)).andReturn(Futures.immediateFuture(false));
+ replay(pubsub);
+ initializeTopic();
+ assertFalse(topic.deleteAsync().get());
+ }
+
+ @Test
+ public void testPublishOneMessage() {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ Message message = Message.of("payload1");
+ String messageId = "messageId";
+ expect(pubsub.publish(NAME, message)).andReturn(messageId);
+ replay(pubsub);
+ initializeTopic();
+ assertEquals(messageId, topic.publish(message));
+ }
+
+ @Test
+ public void testPublishOneMessageAsync() throws ExecutionException, InterruptedException {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ Message message = Message.of("payload1");
+ String messageId = "messageId";
+ expect(pubsub.publishAsync(NAME, message))
+ .andReturn(Futures.immediateFuture(messageId));
+ replay(pubsub);
+ initializeTopic();
+ assertEquals(messageId, topic.publishAsync(message).get());
+ }
+
+ @Test
+ public void testPublishMoreMessages() {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ Message message1 = Message.of("payload1");
+ Message message2 = Message.of("payload2");
+ List messageIds = ImmutableList.of("messageId1", "messageId2");
+ expect(pubsub.publish(NAME, message1, message2)).andReturn(messageIds);
+ replay(pubsub);
+ initializeTopic();
+ assertEquals(messageIds, topic.publish(message1, message2));
+ }
+
+ @Test
+ public void testPublishMoreMessagesAsync() throws ExecutionException, InterruptedException {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ Message message1 = Message.of("payload1");
+ Message message2 = Message.of("payload2");
+ List messageIds = ImmutableList.of("messageId1", "messageId2");
+ expect(pubsub.publishAsync(NAME, message1, message2))
+ .andReturn(Futures.immediateFuture(messageIds));
+ replay(pubsub);
+ initializeTopic();
+ assertEquals(messageIds, topic.publishAsync(message1, message2).get());
+ }
+
+ @Test
+ public void testPublishMessageList() {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ Message message1 = Message.of("payload1");
+ Message message2 = Message.of("payload2");
+ List messages = ImmutableList.of(message1, message2);
+ List messageIds = ImmutableList.of("messageId1", "messageId2");
+ expect(pubsub.publish(NAME, messages)).andReturn(messageIds);
+ replay(pubsub);
+ initializeTopic();
+ assertEquals(messageIds, topic.publish(messages));
+ }
+
+ @Test
+ public void testPublishMessageListAsync() throws ExecutionException, InterruptedException {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ Message message1 = Message.of("payload1");
+ Message message2 = Message.of("payload2");
+ List messages = ImmutableList.of(message1, message2);
+ List messageIds = ImmutableList.of("messageId1", "messageId2");
+ expect(pubsub.publishAsync(NAME, messages))
+ .andReturn(Futures.immediateFuture(messageIds));
+ replay(pubsub);
+ initializeTopic();
+ assertEquals(messageIds, topic.publishAsync(messages).get());
+ }
+
+ @Test
+ public void testListSubscriptions() {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ final List subscriptions = ImmutableList.of(
+ new SubscriptionId("project", "subscription1"),
+ new SubscriptionId("project", "subscription2"));
+ Page result = new PageImpl<>(null, null, subscriptions);
+ expect(pubsub.listSubscriptions(NAME)).andReturn(result);
+ replay(pubsub);
+ initializeTopic();
+ assertEquals(subscriptions, topic.listSubscriptions().values());
+ }
+
+ @Test
+ public void testListSubscriptionsWithOptions() {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ final List subscriptions = ImmutableList.of(
+ new SubscriptionId("project", "subscription1"),
+ new SubscriptionId("project", "subscription2"));
+ Page result = new PageImpl<>(null, null, subscriptions);
+ expect(pubsub.listSubscriptions(NAME, ListOption.pageSize(42))).andReturn(result);
+ replay(pubsub);
+ initializeTopic();
+ assertEquals(subscriptions, topic.listSubscriptions(ListOption.pageSize(42)).values());
+ }
+
+ @Test
+ public void testListSubscriptionsAsync() throws ExecutionException, InterruptedException {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ final List subscriptions = ImmutableList.of(
+ new SubscriptionId("project", "subscription1"),
+ new SubscriptionId("project", "subscription2"));
+ AsyncPage result = new AsyncPageImpl<>(null, null, subscriptions);
+ expect(pubsub.listSubscriptionsAsync(NAME))
+ .andReturn(Futures.immediateFuture(result));
+ replay(pubsub);
+ initializeTopic();
+ assertEquals(subscriptions, topic.listSubscriptionsAsync().get().values());
+ }
+
+ @Test
+ public void testListSubscriptionsAsyncWithOptions()
+ throws ExecutionException, InterruptedException {
+ initializeExpectedTopic(1);
+ expect(pubsub.options()).andReturn(mockOptions);
+ final List subscriptions = ImmutableList.of(
+ new SubscriptionId("project", "subscription1"),
+ new SubscriptionId("project", "subscription2"));
+ AsyncPage result = new AsyncPageImpl<>(null, null, subscriptions);
+ expect(pubsub.listSubscriptionsAsync(NAME, ListOption.pageSize(42)))
+ .andReturn(Futures.immediateFuture(result));
+ replay(pubsub);
+ initializeTopic();
+ assertEquals(subscriptions,
+ topic.listSubscriptionsAsync(ListOption.pageSize(42)).get().values());
+ }
+
+ private void compareTopic(Topic expected, Topic value) {
+ assertEquals(expected, value);
+ assertEquals(expected.name(), value.name());
+ assertEquals(expected.hashCode(), value.hashCode());
+ }
+}