diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherClient.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherClient.java index e1cd043a439f..0b7ca15db947 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherClient.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/PublisherClient.java @@ -20,7 +20,6 @@ import com.google.api.gax.grpc.ChannelAndExecutor; import com.google.api.gax.grpc.UnaryCallable; -import com.google.api.gax.protobuf.PathTemplate; import com.google.iam.v1.GetIamPolicyRequest; import com.google.iam.v1.Policy; import com.google.iam.v1.SetIamPolicyRequest; @@ -53,6 +52,8 @@ * Service Description: The service that an application uses to manipulate topics, and to send * messages to a topic. * + *

To publish messages to a topic, see the Publisher class. + * *

This class provides the ability to make remote calls to the backing service through method * calls that map to API methods. Sample code to get started: * @@ -127,39 +128,6 @@ public class PublisherClient implements AutoCloseable { private final UnaryCallable testIamPermissionsCallable; - private static final PathTemplate PROJECT_PATH_TEMPLATE = - PathTemplate.createWithoutUrlEncoding("projects/{project}"); - - private static final PathTemplate TOPIC_PATH_TEMPLATE = - PathTemplate.createWithoutUrlEncoding("projects/{project}/topics/{topic}"); - - /** Formats a string containing the fully-qualified path to represent a project resource. */ - public static final String formatProjectName(String project) { - return PROJECT_PATH_TEMPLATE.instantiate("project", project); - } - - /** Formats a string containing the fully-qualified path to represent a topic resource. */ - public static final String formatTopicName(String project, String topic) { - return TOPIC_PATH_TEMPLATE.instantiate( - "project", project, - "topic", topic); - } - - /** Parses the project from the given fully-qualified path which represents a project resource. */ - public static final String parseProjectFromProjectName(String projectName) { - return PROJECT_PATH_TEMPLATE.parse(projectName).get("project"); - } - - /** Parses the project from the given fully-qualified path which represents a topic resource. */ - public static final String parseProjectFromTopicName(String topicName) { - return TOPIC_PATH_TEMPLATE.parse(topicName).get("project"); - } - - /** Parses the topic from the given fully-qualified path which represents a topic resource. */ - public static final String parseTopicFromTopicName(String topicName) { - return TOPIC_PATH_TEMPLATE.parse(topicName).get("topic"); - } - /** Constructs an instance of PublisherClient with default settings. */ public static final PublisherClient create() throws IOException { return create(PublisherSettings.defaultBuilder().build()); @@ -327,11 +295,13 @@ public final UnaryCallable createTopicCallable() { * } * * - * @param topic The messages in the request will be published on this topic. + * @param topic The messages in the request will be published on this topic. Format is + * `projects/{project}/topics/{topic}`. * @param messages The messages to publish. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ - public final PublishResponse publish(TopicName topic, List messages) { + /* package-private */ final PublishResponse publish( + TopicName topic, List messages) { PublishRequest request = PublishRequest.newBuilder().setTopicWithTopicName(topic).addAllMessages(messages).build(); @@ -395,7 +365,7 @@ public final PublishResponse publish(PublishRequest request) { * } * */ - public final UnaryCallable publishCallable() { + /* package-private */ final UnaryCallable publishCallable() { return publishCallable; } @@ -412,7 +382,7 @@ public final UnaryCallable publishCallable() { * } * * - * @param topic The name of the topic to get. + * @param topic The name of the topic to get. Format is `projects/{project}/topics/{topic}`. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ public final Topic getTopic(TopicName topic) { @@ -481,7 +451,8 @@ public final UnaryCallable getTopicCallable() { * } * * - * @param project The name of the cloud project that topics belong to. + * @param project The name of the cloud project that topics belong to. Format is + * `projects/{project}`. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ public final ListTopicsPagedResponse listTopics(ProjectName project) { @@ -585,7 +556,8 @@ public final UnaryCallable listTopicsCall * } * * - * @param topic The name of the topic that subscriptions are attached to. + * @param topic The name of the topic that subscriptions are attached to. Format is + * `projects/{project}/topics/{topic}`. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ public final ListTopicSubscriptionsPagedResponse listTopicSubscriptions(TopicName topic) { @@ -693,7 +665,7 @@ public final ListTopicSubscriptionsPagedResponse listTopicSubscriptions( * } * * - * @param topic Name of the topic to delete. + * @param topic Name of the topic to delete. Format is `projects/{project}/topics/{topic}`. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ public final void deleteTopic(TopicName topic) { @@ -762,7 +734,7 @@ public final UnaryCallable deleteTopicCallable() { * *


    * try (PublisherClient publisherClient = PublisherClient.create()) {
-   *   String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+   *   String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
    *   Policy policy = Policy.newBuilder().build();
    *   Policy response = publisherClient.setIamPolicy(formattedResource, policy);
    * }
@@ -777,7 +749,7 @@ public final UnaryCallable deleteTopicCallable() {
    * @throws com.google.api.gax.grpc.ApiException if the remote call fails
    */
   public final Policy setIamPolicy(String resource, Policy policy) {
-    TOPIC_PATH_TEMPLATE.validate(resource, "setIamPolicy");
+
     SetIamPolicyRequest request =
         SetIamPolicyRequest.newBuilder().setResource(resource).setPolicy(policy).build();
     return setIamPolicy(request);
@@ -791,7 +763,7 @@ public final Policy setIamPolicy(String resource, Policy policy) {
    *
    * 

    * try (PublisherClient publisherClient = PublisherClient.create()) {
-   *   String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+   *   String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
    *   Policy policy = Policy.newBuilder().build();
    *   SetIamPolicyRequest request = SetIamPolicyRequest.newBuilder()
    *     .setResource(formattedResource)
@@ -816,7 +788,7 @@ public final Policy setIamPolicy(SetIamPolicyRequest request) {
    *
    * 

    * try (PublisherClient publisherClient = PublisherClient.create()) {
-   *   String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+   *   String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
    *   Policy policy = Policy.newBuilder().build();
    *   SetIamPolicyRequest request = SetIamPolicyRequest.newBuilder()
    *     .setResource(formattedResource)
@@ -841,7 +813,7 @@ public final UnaryCallable setIamPolicyCallable() {
    *
    * 

    * try (PublisherClient publisherClient = PublisherClient.create()) {
-   *   String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+   *   String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
    *   Policy response = publisherClient.getIamPolicy(formattedResource);
    * }
    * 
@@ -852,7 +824,7 @@ public final UnaryCallable setIamPolicyCallable() { * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ public final Policy getIamPolicy(String resource) { - TOPIC_PATH_TEMPLATE.validate(resource, "getIamPolicy"); + GetIamPolicyRequest request = GetIamPolicyRequest.newBuilder().setResource(resource).build(); return getIamPolicy(request); } @@ -866,7 +838,7 @@ public final Policy getIamPolicy(String resource) { * *

    * try (PublisherClient publisherClient = PublisherClient.create()) {
-   *   String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+   *   String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
    *   GetIamPolicyRequest request = GetIamPolicyRequest.newBuilder()
    *     .setResource(formattedResource)
    *     .build();
@@ -890,7 +862,7 @@ private final Policy getIamPolicy(GetIamPolicyRequest request) {
    *
    * 

    * try (PublisherClient publisherClient = PublisherClient.create()) {
-   *   String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+   *   String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
    *   GetIamPolicyRequest request = GetIamPolicyRequest.newBuilder()
    *     .setResource(formattedResource)
    *     .build();
@@ -906,13 +878,14 @@ public final UnaryCallable getIamPolicyCallable() {
 
   // AUTO-GENERATED DOCUMENTATION AND METHOD
   /**
-   * Returns permissions that a caller has on the specified resource.
+   * Returns permissions that a caller has on the specified resource. If the resource does not
+   * exist, this will return an empty set of permissions, not a NOT_FOUND error.
    *
    * 

Sample code: * *


    * try (PublisherClient publisherClient = PublisherClient.create()) {
-   *   String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+   *   String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
    *   List<String> permissions = new ArrayList<>();
    *   TestIamPermissionsResponse response = publisherClient.testIamPermissions(formattedResource, permissions);
    * }
@@ -928,7 +901,7 @@ public final UnaryCallable getIamPolicyCallable() {
    */
   public final TestIamPermissionsResponse testIamPermissions(
       String resource, List permissions) {
-    TOPIC_PATH_TEMPLATE.validate(resource, "testIamPermissions");
+
     TestIamPermissionsRequest request =
         TestIamPermissionsRequest.newBuilder()
             .setResource(resource)
@@ -939,13 +912,14 @@ public final TestIamPermissionsResponse testIamPermissions(
 
   // AUTO-GENERATED DOCUMENTATION AND METHOD
   /**
-   * Returns permissions that a caller has on the specified resource.
+   * Returns permissions that a caller has on the specified resource. If the resource does not
+   * exist, this will return an empty set of permissions, not a NOT_FOUND error.
    *
    * 

Sample code: * *


    * try (PublisherClient publisherClient = PublisherClient.create()) {
-   *   String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+   *   String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
    *   List<String> permissions = new ArrayList<>();
    *   TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder()
    *     .setResource(formattedResource)
@@ -964,13 +938,14 @@ public final TestIamPermissionsResponse testIamPermissions(TestIamPermissionsReq
 
   // AUTO-GENERATED DOCUMENTATION AND METHOD
   /**
-   * Returns permissions that a caller has on the specified resource.
+   * Returns permissions that a caller has on the specified resource. If the resource does not
+   * exist, this will return an empty set of permissions, not a NOT_FOUND error.
    *
    * 

Sample code: * *


    * try (PublisherClient publisherClient = PublisherClient.create()) {
-   *   String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+   *   String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
    *   List<String> permissions = new ArrayList<>();
    *   TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder()
    *     .setResource(formattedResource)
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberClient.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberClient.java
index 8bbd1ff67167..23f723a79a31 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberClient.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberClient.java
@@ -18,8 +18,8 @@
 import static com.google.cloud.pubsub.spi.v1.PagedResponseWrappers.ListSubscriptionsPagedResponse;
 
 import com.google.api.gax.grpc.ChannelAndExecutor;
+import com.google.api.gax.grpc.StreamingCallable;
 import com.google.api.gax.grpc.UnaryCallable;
-import com.google.api.gax.protobuf.PathTemplate;
 import com.google.iam.v1.GetIamPolicyRequest;
 import com.google.iam.v1.Policy;
 import com.google.iam.v1.SetIamPolicyRequest;
@@ -38,6 +38,8 @@
 import com.google.pubsub.v1.PullRequest;
 import com.google.pubsub.v1.PullResponse;
 import com.google.pubsub.v1.PushConfig;
+import com.google.pubsub.v1.StreamingPullRequest;
+import com.google.pubsub.v1.StreamingPullResponse;
 import com.google.pubsub.v1.Subscription;
 import com.google.pubsub.v1.SubscriptionName;
 import com.google.pubsub.v1.TopicName;
@@ -55,6 +57,8 @@
  * Service Description: The service that an application uses to manipulate subscriptions and to
  * consume messages from a subscription via the `Pull` method.
  *
+ * 

To retrieve messages from a subscription, see the Subscriber class. + * *

This class provides the ability to make remote calls to the backing service through method * calls that map to API methods. Sample code to get started: * @@ -127,71 +131,14 @@ public class SubscriberClient implements AutoCloseable { private final UnaryCallable modifyAckDeadlineCallable; private final UnaryCallable acknowledgeCallable; private final UnaryCallable pullCallable; + private final StreamingCallable + streamingPullCallable; private final UnaryCallable modifyPushConfigCallable; private final UnaryCallable setIamPolicyCallable; private final UnaryCallable getIamPolicyCallable; private final UnaryCallable testIamPermissionsCallable; - private static final PathTemplate PROJECT_PATH_TEMPLATE = - PathTemplate.createWithoutUrlEncoding("projects/{project}"); - - private static final PathTemplate SUBSCRIPTION_PATH_TEMPLATE = - PathTemplate.createWithoutUrlEncoding("projects/{project}/subscriptions/{subscription}"); - - private static final PathTemplate TOPIC_PATH_TEMPLATE = - PathTemplate.createWithoutUrlEncoding("projects/{project}/topics/{topic}"); - - /** Formats a string containing the fully-qualified path to represent a project resource. */ - public static final String formatProjectName(String project) { - return PROJECT_PATH_TEMPLATE.instantiate("project", project); - } - - /** Formats a string containing the fully-qualified path to represent a subscription resource. */ - public static final String formatSubscriptionName(String project, String subscription) { - return SUBSCRIPTION_PATH_TEMPLATE.instantiate( - "project", project, - "subscription", subscription); - } - - /** Formats a string containing the fully-qualified path to represent a topic resource. */ - public static final String formatTopicName(String project, String topic) { - return TOPIC_PATH_TEMPLATE.instantiate( - "project", project, - "topic", topic); - } - - /** Parses the project from the given fully-qualified path which represents a project resource. */ - public static final String parseProjectFromProjectName(String projectName) { - return PROJECT_PATH_TEMPLATE.parse(projectName).get("project"); - } - - /** - * Parses the project from the given fully-qualified path which represents a subscription - * resource. - */ - public static final String parseProjectFromSubscriptionName(String subscriptionName) { - return SUBSCRIPTION_PATH_TEMPLATE.parse(subscriptionName).get("project"); - } - - /** - * Parses the subscription from the given fully-qualified path which represents a subscription - * resource. - */ - public static final String parseSubscriptionFromSubscriptionName(String subscriptionName) { - return SUBSCRIPTION_PATH_TEMPLATE.parse(subscriptionName).get("subscription"); - } - - /** Parses the project from the given fully-qualified path which represents a topic resource. */ - public static final String parseProjectFromTopicName(String topicName) { - return TOPIC_PATH_TEMPLATE.parse(topicName).get("project"); - } - - /** Parses the topic from the given fully-qualified path which represents a topic resource. */ - public static final String parseTopicFromTopicName(String topicName) { - return TOPIC_PATH_TEMPLATE.parse(topicName).get("topic"); - } - /** Constructs an instance of SubscriberClient with default settings. */ public static final SubscriberClient create() throws IOException { return create(SubscriberSettings.defaultBuilder().build()); @@ -231,6 +178,8 @@ protected SubscriberClient(SubscriberSettings settings) throws IOException { this.acknowledgeCallable = UnaryCallable.create(settings.acknowledgeSettings(), this.channel, this.executor); this.pullCallable = UnaryCallable.create(settings.pullSettings(), this.channel, this.executor); + this.streamingPullCallable = + StreamingCallable.create(settings.streamingPullSettings(), this.channel); this.modifyPushConfigCallable = UnaryCallable.create(settings.modifyPushConfigSettings(), this.channel, this.executor); this.setIamPolicyCallable = @@ -270,8 +219,10 @@ public final SubscriberSettings getSettings() { * `ALREADY_EXISTS`. If the corresponding topic doesn't exist, returns `NOT_FOUND`. * *

If the name is not provided in the request, the server will assign a random name for this - * subscription on the same project as the topic. Note that for REST API requests, you must - * specify a name. + * subscription on the same project as the topic, conforming to the [resource name + * format](https://cloud.google.com/pubsub/docs/overview#names). The generated name is populated + * in the returned Subscription object. Note that for REST API requests, you must specify a name + * in the request. * *

Sample code: * @@ -290,8 +241,9 @@ public final SubscriberSettings getSettings() { * letter, and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`), underscores * (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent signs (`%`). It must be between 3 * and 255 characters in length, and it must not start with `"goog"`. - * @param topic The name of the topic from which this subscription is receiving messages. The - * value of this field will be `_deleted-topic_` if the topic has been deleted. + * @param topic The name of the topic from which this subscription is receiving messages. Format + * is `projects/{project}/topics/{topic}`. The value of this field will be `_deleted-topic_` + * if the topic has been deleted. * @param pushConfig If push delivery is used with this subscription, this field is used to * configure it. An empty `pushConfig` signifies that the subscriber will pull and ack * messages using API methods. @@ -301,13 +253,13 @@ public final SubscriberSettings getSettings() { * and will not be delivered again during that time (on a best-effort basis). *

For pull subscriptions, this value is used as the initial value for the ack deadline. To * override this value for a given message, call `ModifyAckDeadline` with the corresponding - * `ack_id` if using pull. The maximum custom deadline you can specify is 600 seconds (10 - * minutes). + * `ack_id` if using pull. The minimum custom deadline you can specify is 10 seconds. The + * maximum custom deadline you can specify is 600 seconds (10 minutes). If this parameter is + * 0, a default value of 10 seconds is used. *

For push delivery, this value is also used to set the request timeout for the call to * the push endpoint. *

If the subscriber never acknowledges the message, the Pub/Sub system will eventually * redeliver the message. - *

If this parameter is 0, a default value of 10 seconds is used. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ public final Subscription createSubscription( @@ -316,7 +268,7 @@ public final Subscription createSubscription( Subscription request = Subscription.newBuilder() .setNameWithSubscriptionName(name) - .setTopicWithTopicNameOneof(TopicNameOneof.from(topic)) + .setTopicWithTopicNameOneof(topic == null ? null : TopicNameOneof.from(topic)) .setPushConfig(pushConfig) .setAckDeadlineSeconds(ackDeadlineSeconds) .build(); @@ -329,8 +281,10 @@ public final Subscription createSubscription( * `ALREADY_EXISTS`. If the corresponding topic doesn't exist, returns `NOT_FOUND`. * *

If the name is not provided in the request, the server will assign a random name for this - * subscription on the same project as the topic. Note that for REST API requests, you must - * specify a name. + * subscription on the same project as the topic, conforming to the [resource name + * format](https://cloud.google.com/pubsub/docs/overview#names). The generated name is populated + * in the returned Subscription object. Note that for REST API requests, you must specify a name + * in the request. * *

Sample code: * @@ -359,8 +313,10 @@ public final Subscription createSubscription(Subscription request) { * `ALREADY_EXISTS`. If the corresponding topic doesn't exist, returns `NOT_FOUND`. * *

If the name is not provided in the request, the server will assign a random name for this - * subscription on the same project as the topic. Note that for REST API requests, you must - * specify a name. + * subscription on the same project as the topic, conforming to the [resource name + * format](https://cloud.google.com/pubsub/docs/overview#names). The generated name is populated + * in the returned Subscription object. Note that for REST API requests, you must specify a name + * in the request. * *

Sample code: * @@ -395,7 +351,8 @@ public final UnaryCallable createSubscriptionCallabl * } *

* - * @param subscription The name of the subscription to get. + * @param subscription The name of the subscription to get. Format is + * `projects/{project}/subscriptions/{sub}`. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ public final Subscription getSubscription(SubscriptionName subscription) { @@ -467,7 +424,8 @@ public final UnaryCallable getSubscription * } *
* - * @param project The name of the cloud project that subscriptions belong to. + * @param project The name of the cloud project that subscriptions belong to. Format is + * `projects/{project}`. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ public final ListSubscriptionsPagedResponse listSubscriptions(ProjectName project) { @@ -560,10 +518,10 @@ public final ListSubscriptionsPagedResponse listSubscriptions(ListSubscriptionsR // AUTO-GENERATED DOCUMENTATION AND METHOD /** - * Deletes an existing subscription. All pending messages in the subscription are immediately + * Deletes an existing subscription. All messages retained in the subscription are immediately * dropped. Calls to `Pull` after deletion will return `NOT_FOUND`. After a subscription is * deleted, a new one may be created with the same name, but the new one has no association with - * the old subscription, or its topic unless the same topic is specified. + * the old subscription or its topic unless the same topic is specified. * *

Sample code: * @@ -574,7 +532,8 @@ public final ListSubscriptionsPagedResponse listSubscriptions(ListSubscriptionsR * } *

* - * @param subscription The subscription to delete. + * @param subscription The subscription to delete. Format is + * `projects/{project}/subscriptions/{sub}`. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ public final void deleteSubscription(SubscriptionName subscription) { @@ -588,10 +547,10 @@ public final void deleteSubscription(SubscriptionName subscription) { // AUTO-GENERATED DOCUMENTATION AND METHOD /** - * Deletes an existing subscription. All pending messages in the subscription are immediately + * Deletes an existing subscription. All messages retained in the subscription are immediately * dropped. Calls to `Pull` after deletion will return `NOT_FOUND`. After a subscription is * deleted, a new one may be created with the same name, but the new one has no association with - * the old subscription, or its topic unless the same topic is specified. + * the old subscription or its topic unless the same topic is specified. * *

Sample code: * @@ -614,10 +573,10 @@ private final void deleteSubscription(DeleteSubscriptionRequest request) { // AUTO-GENERATED DOCUMENTATION AND METHOD /** - * Deletes an existing subscription. All pending messages in the subscription are immediately + * Deletes an existing subscription. All messages retained in the subscription are immediately * dropped. Calls to `Pull` after deletion will return `NOT_FOUND`. After a subscription is * deleted, a new one may be created with the same name, but the new one has no association with - * the old subscription, or its topic unless the same topic is specified. + * the old subscription or its topic unless the same topic is specified. * *

Sample code: * @@ -655,12 +614,14 @@ public final UnaryCallable deleteSubscriptionC * } *

* - * @param subscription The name of the subscription. + * @param subscription The name of the subscription. Format is + * `projects/{project}/subscriptions/{sub}`. * @param ackIds List of acknowledgment IDs. * @param ackDeadlineSeconds The new ack deadline with respect to the time this request was sent - * to the Pub/Sub system. Must be >= 0. For example, if the value is 10, the new ack - * deadline will expire 10 seconds after the `ModifyAckDeadline` call was made. Specifying - * zero may immediately make the message available for another pull request. + * to the Pub/Sub system. For example, if the value is 10, the new ack deadline will expire 10 + * seconds after the `ModifyAckDeadline` call was made. Specifying zero may immediately make + * the message available for another pull request. The minimum deadline you can specify is 0 + * seconds. The maximum deadline you can specify is 600 seconds (10 minutes). * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ public final void modifyAckDeadline( @@ -752,7 +713,8 @@ public final UnaryCallable modifyAckDeadlineCal * } *
* - * @param subscription The subscription whose message is being acknowledged. + * @param subscription The subscription whose message is being acknowledged. Format is + * `projects/{project}/subscriptions/{sub}`. * @param ackIds The acknowledgment ID for the messages being acknowledged that was returned by * the Pub/Sub system in the `Pull` response. Must not be empty. * @throws com.google.api.gax.grpc.ApiException if the remote call fails @@ -841,16 +803,18 @@ public final UnaryCallable acknowledgeCallable() { * } *
* - * @param subscription The subscription from which messages should be pulled. - * @param returnImmediately If this is specified as true the system will respond immediately even - * if it is not able to return a message in the `Pull` response. Otherwise the system is - * allowed to wait until at least one message is available rather than returning no messages. - * The client may cancel the request if it does not wish to wait any longer for the response. + * @param subscription The subscription from which messages should be pulled. Format is + * `projects/{project}/subscriptions/{sub}`. + * @param returnImmediately If this field set to true, the system will respond immediately even if + * it there are no messages available to return in the `Pull` response. Otherwise, the system + * may wait (for a bounded amount of time) until at least one message is available, rather + * than returning no messages. The client may cancel the request if it does not wish to wait + * any longer for the response. * @param maxMessages The maximum number of messages returned for this request. The Pub/Sub system * may return fewer than the number specified. * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ - public final PullResponse pull( + /* package-private */ final PullResponse pull( SubscriptionName subscription, boolean returnImmediately, int maxMessages) { PullRequest request = @@ -911,10 +875,63 @@ public final PullResponse pull(PullRequest request) { * } *
*/ - public final UnaryCallable pullCallable() { + /* package-private */ final UnaryCallable pullCallable() { return pullCallable; } + // AUTO-GENERATED DOCUMENTATION AND METHOD + /** + * (EXPERIMENTAL) StreamingPull is an experimental feature. This RPC will respond with + * UNIMPLEMENTED errors unless you have been invited to test this feature. Contact + * cloud-pubsub{@literal @}google.com with any questions. + * + *

Establishes a stream with the server, which sends messages down to the client. The client + * streams acknowledgements and ack deadline modifications back to the server. The server will + * close the stream and return the status on any error. The server may close the stream with + * status `OK` to reassign server-side resources, in which case, the client should re-establish + * the stream. `UNAVAILABLE` may also be returned in the case of a transient error (e.g., a server + * restart). These should also be retried by the client. Flow control can be achieved by + * configuring the underlying RPC channel. + * + *

Sample code: + * + *


+   * try (SubscriberClient subscriberClient = SubscriberClient.create()) {
+   *   StreamObserver<StreamingPullResponse> responseObserver =
+   *       new StreamObserver<StreamingPullResponse>() {
+   *         {@literal @}Override
+   *         public void onNext(StreamingPullResponse response) {
+   *           // Do something when receive a response
+   *         }
+   *
+   *         {@literal @}Override
+   *         public void onError(Throwable t) {
+   *           // Add error-handling
+   *         }
+   *
+   *         {@literal @}Override
+   *         public void onCompleted() {
+   *           // Do something when complete.
+   *         }
+   *       };
+   *   StreamObserver<StreamingRecognizeRequest> requestObserver =
+   *       subscriberClient.streamingPullCallable().bidiStreamingCall(responseObserver)});
+   *
+   *   SubscriptionName subscription = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]");
+   *   int streamAckDeadlineSeconds = 0;
+   *   StreamingPullRequest request = StreamingPullRequest.newBuilder()
+   *     .setSubscriptionWithSubscriptionName(subscription)
+   *     .setStreamAckDeadlineSeconds(streamAckDeadlineSeconds)
+   *     .build();
+   *   requestObserver.onNext(request);
+   * }
+   * 
+ */ + /* package-private */ final StreamingCallable + streamingPullCallable() { + return streamingPullCallable; + } + // AUTO-GENERATED DOCUMENTATION AND METHOD /** * Modifies the `PushConfig` for a specified subscription. @@ -934,7 +951,8 @@ public final UnaryCallable pullCallable() { * } *
* - * @param subscription The name of the subscription. + * @param subscription The name of the subscription. Format is + * `projects/{project}/subscriptions/{sub}`. * @param pushConfig The push configuration for future deliveries. *

An empty `pushConfig` indicates that the Pub/Sub system should stop pushing messages * from the given subscription and allow messages to be pulled and acknowledged - effectively @@ -1018,7 +1036,7 @@ public final UnaryCallable modifyPushConfigCalla * *


    * try (SubscriberClient subscriberClient = SubscriberClient.create()) {
-   *   String formattedResource = SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+   *   String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
    *   Policy policy = Policy.newBuilder().build();
    *   Policy response = subscriberClient.setIamPolicy(formattedResource, policy);
    * }
@@ -1033,7 +1051,7 @@ public final UnaryCallable modifyPushConfigCalla
    * @throws com.google.api.gax.grpc.ApiException if the remote call fails
    */
   public final Policy setIamPolicy(String resource, Policy policy) {
-    SUBSCRIPTION_PATH_TEMPLATE.validate(resource, "setIamPolicy");
+
     SetIamPolicyRequest request =
         SetIamPolicyRequest.newBuilder().setResource(resource).setPolicy(policy).build();
     return setIamPolicy(request);
@@ -1047,7 +1065,7 @@ public final Policy setIamPolicy(String resource, Policy policy) {
    *
    * 

    * try (SubscriberClient subscriberClient = SubscriberClient.create()) {
-   *   String formattedResource = SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+   *   String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
    *   Policy policy = Policy.newBuilder().build();
    *   SetIamPolicyRequest request = SetIamPolicyRequest.newBuilder()
    *     .setResource(formattedResource)
@@ -1072,7 +1090,7 @@ public final Policy setIamPolicy(SetIamPolicyRequest request) {
    *
    * 

    * try (SubscriberClient subscriberClient = SubscriberClient.create()) {
-   *   String formattedResource = SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+   *   String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
    *   Policy policy = Policy.newBuilder().build();
    *   SetIamPolicyRequest request = SetIamPolicyRequest.newBuilder()
    *     .setResource(formattedResource)
@@ -1097,7 +1115,7 @@ public final UnaryCallable setIamPolicyCallable() {
    *
    * 

    * try (SubscriberClient subscriberClient = SubscriberClient.create()) {
-   *   String formattedResource = SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+   *   String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
    *   Policy response = subscriberClient.getIamPolicy(formattedResource);
    * }
    * 
@@ -1108,7 +1126,7 @@ public final UnaryCallable setIamPolicyCallable() { * @throws com.google.api.gax.grpc.ApiException if the remote call fails */ public final Policy getIamPolicy(String resource) { - SUBSCRIPTION_PATH_TEMPLATE.validate(resource, "getIamPolicy"); + GetIamPolicyRequest request = GetIamPolicyRequest.newBuilder().setResource(resource).build(); return getIamPolicy(request); } @@ -1122,7 +1140,7 @@ public final Policy getIamPolicy(String resource) { * *

    * try (SubscriberClient subscriberClient = SubscriberClient.create()) {
-   *   String formattedResource = SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+   *   String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
    *   GetIamPolicyRequest request = GetIamPolicyRequest.newBuilder()
    *     .setResource(formattedResource)
    *     .build();
@@ -1146,7 +1164,7 @@ private final Policy getIamPolicy(GetIamPolicyRequest request) {
    *
    * 

    * try (SubscriberClient subscriberClient = SubscriberClient.create()) {
-   *   String formattedResource = SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+   *   String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
    *   GetIamPolicyRequest request = GetIamPolicyRequest.newBuilder()
    *     .setResource(formattedResource)
    *     .build();
@@ -1162,13 +1180,14 @@ public final UnaryCallable getIamPolicyCallable() {
 
   // AUTO-GENERATED DOCUMENTATION AND METHOD
   /**
-   * Returns permissions that a caller has on the specified resource.
+   * Returns permissions that a caller has on the specified resource. If the resource does not
+   * exist, this will return an empty set of permissions, not a NOT_FOUND error.
    *
    * 

Sample code: * *


    * try (SubscriberClient subscriberClient = SubscriberClient.create()) {
-   *   String formattedResource = SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+   *   String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
    *   List<String> permissions = new ArrayList<>();
    *   TestIamPermissionsResponse response = subscriberClient.testIamPermissions(formattedResource, permissions);
    * }
@@ -1184,7 +1203,7 @@ public final UnaryCallable getIamPolicyCallable() {
    */
   public final TestIamPermissionsResponse testIamPermissions(
       String resource, List permissions) {
-    SUBSCRIPTION_PATH_TEMPLATE.validate(resource, "testIamPermissions");
+
     TestIamPermissionsRequest request =
         TestIamPermissionsRequest.newBuilder()
             .setResource(resource)
@@ -1195,13 +1214,14 @@ public final TestIamPermissionsResponse testIamPermissions(
 
   // AUTO-GENERATED DOCUMENTATION AND METHOD
   /**
-   * Returns permissions that a caller has on the specified resource.
+   * Returns permissions that a caller has on the specified resource. If the resource does not
+   * exist, this will return an empty set of permissions, not a NOT_FOUND error.
    *
    * 

Sample code: * *


    * try (SubscriberClient subscriberClient = SubscriberClient.create()) {
-   *   String formattedResource = SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+   *   String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
    *   List<String> permissions = new ArrayList<>();
    *   TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder()
    *     .setResource(formattedResource)
@@ -1220,13 +1240,14 @@ public final TestIamPermissionsResponse testIamPermissions(TestIamPermissionsReq
 
   // AUTO-GENERATED DOCUMENTATION AND METHOD
   /**
-   * Returns permissions that a caller has on the specified resource.
+   * Returns permissions that a caller has on the specified resource. If the resource does not
+   * exist, this will return an empty set of permissions, not a NOT_FOUND error.
    *
    * 

Sample code: * *


    * try (SubscriberClient subscriberClient = SubscriberClient.create()) {
-   *   String formattedResource = SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+   *   String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
    *   List<String> permissions = new ArrayList<>();
    *   TestIamPermissionsRequest request = TestIamPermissionsRequest.newBuilder()
    *     .setResource(formattedResource)
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberSettings.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberSettings.java
index 971e00f2df72..1366d8e10e90 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberSettings.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/SubscriberSettings.java
@@ -29,6 +29,7 @@
 import com.google.api.gax.grpc.PagedListDescriptor;
 import com.google.api.gax.grpc.PagedListResponseFactory;
 import com.google.api.gax.grpc.SimpleCallSettings;
+import com.google.api.gax.grpc.StreamingCallSettings;
 import com.google.api.gax.grpc.UnaryCallSettings;
 import com.google.api.gax.grpc.UnaryCallable;
 import com.google.common.collect.ImmutableList;
@@ -53,6 +54,8 @@
 import com.google.pubsub.v1.ModifyPushConfigRequest;
 import com.google.pubsub.v1.PullRequest;
 import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.StreamingPullRequest;
+import com.google.pubsub.v1.StreamingPullResponse;
 import com.google.pubsub.v1.SubscriberGrpc;
 import com.google.pubsub.v1.Subscription;
 import io.grpc.Status;
@@ -111,6 +114,8 @@ public class SubscriberSettings extends ClientSettings {
   private final SimpleCallSettings modifyAckDeadlineSettings;
   private final SimpleCallSettings acknowledgeSettings;
   private final SimpleCallSettings pullSettings;
+  private final StreamingCallSettings
+      streamingPullSettings;
   private final SimpleCallSettings modifyPushConfigSettings;
   private final SimpleCallSettings setIamPolicySettings;
   private final SimpleCallSettings getIamPolicySettings;
@@ -154,6 +159,12 @@ public SimpleCallSettings pullSettings() {
     return pullSettings;
   }
 
+  /** Returns the object with the settings used for calls to streamingPull. */
+  public StreamingCallSettings
+      streamingPullSettings() {
+    return streamingPullSettings;
+  }
+
   /** Returns the object with the settings used for calls to modifyPushConfig. */
   public SimpleCallSettings modifyPushConfigSettings() {
     return modifyPushConfigSettings;
@@ -233,6 +244,7 @@ private SubscriberSettings(Builder settingsBuilder) throws IOException {
     modifyAckDeadlineSettings = settingsBuilder.modifyAckDeadlineSettings().build();
     acknowledgeSettings = settingsBuilder.acknowledgeSettings().build();
     pullSettings = settingsBuilder.pullSettings().build();
+    streamingPullSettings = settingsBuilder.streamingPullSettings().build();
     modifyPushConfigSettings = settingsBuilder.modifyPushConfigSettings().build();
     setIamPolicySettings = settingsBuilder.setIamPolicySettings().build();
     getIamPolicySettings = settingsBuilder.getIamPolicySettings().build();
@@ -311,6 +323,8 @@ public static class Builder extends ClientSettings.Builder {
         modifyAckDeadlineSettings;
     private final SimpleCallSettings.Builder acknowledgeSettings;
     private final SimpleCallSettings.Builder pullSettings;
+    private final StreamingCallSettings.Builder
+        streamingPullSettings;
     private final SimpleCallSettings.Builder
         modifyPushConfigSettings;
     private final SimpleCallSettings.Builder setIamPolicySettings;
@@ -382,6 +396,9 @@ private Builder() {
 
       pullSettings = SimpleCallSettings.newBuilder(SubscriberGrpc.METHOD_PULL);
 
+      streamingPullSettings =
+          StreamingCallSettings.newBuilder(SubscriberGrpc.METHOD_STREAMING_PULL);
+
       modifyPushConfigSettings =
           SimpleCallSettings.newBuilder(SubscriberGrpc.METHOD_MODIFY_PUSH_CONFIG);
 
@@ -478,6 +495,7 @@ private Builder(SubscriberSettings settings) {
       modifyAckDeadlineSettings = settings.modifyAckDeadlineSettings.toBuilder();
       acknowledgeSettings = settings.acknowledgeSettings.toBuilder();
       pullSettings = settings.pullSettings.toBuilder();
+      streamingPullSettings = settings.streamingPullSettings.toBuilder();
       modifyPushConfigSettings = settings.modifyPushConfigSettings.toBuilder();
       setIamPolicySettings = settings.setIamPolicySettings.toBuilder();
       getIamPolicySettings = settings.getIamPolicySettings.toBuilder();
@@ -561,6 +579,12 @@ public SimpleCallSettings.Builder pullSettings() {
       return pullSettings;
     }
 
+    /** Returns the builder for the settings used for calls to streamingPull. */
+    public StreamingCallSettings.Builder
+        streamingPullSettings() {
+      return streamingPullSettings;
+    }
+
     /** Returns the builder for the settings used for calls to modifyPushConfig. */
     public SimpleCallSettings.Builder modifyPushConfigSettings() {
       return modifyPushConfigSettings;
diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/package-info.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/package-info.java
index d85d557aeff1..057a948b8683 100644
--- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/package-info.java
+++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/spi/v1/package-info.java
@@ -24,6 +24,8 @@
  * 

Service Description: The service that an application uses to manipulate topics, and to send * messages to a topic. * + *

To publish messages to a topic, see the Publisher class. + * *

Sample for PublisherClient: * *

@@ -40,6 +42,8 @@
  * 

Service Description: The service that an application uses to manipulate subscriptions and to * consume messages from a subscription via the `Pull` method. * + *

To retrieve messages from a subscription, see the Subscriber class. + * *

Sample for SubscriberClient: * *

diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/MockSubscriberImpl.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/MockSubscriberImpl.java
index 2bfcd52631d0..cb589e8e0028 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/MockSubscriberImpl.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/MockSubscriberImpl.java
@@ -26,6 +26,8 @@
 import com.google.pubsub.v1.ModifyPushConfigRequest;
 import com.google.pubsub.v1.PullRequest;
 import com.google.pubsub.v1.PullResponse;
+import com.google.pubsub.v1.StreamingPullRequest;
+import com.google.pubsub.v1.StreamingPullResponse;
 import com.google.pubsub.v1.SubscriberGrpc.SubscriberImplBase;
 import com.google.pubsub.v1.Subscription;
 import io.grpc.stub.StreamObserver;
@@ -169,6 +171,36 @@ public void pull(PullRequest request, StreamObserver responseObser
     }
   }
 
+  @Override
+  public StreamObserver streamingPull(
+      final StreamObserver responseObserver) {
+    final Object response = responses.remove();
+    StreamObserver requestObserver =
+        new StreamObserver() {
+          @Override
+          public void onNext(StreamingPullRequest value) {
+            if (response instanceof StreamingPullResponse) {
+              responseObserver.onNext((StreamingPullResponse) response);
+            } else if (response instanceof Exception) {
+              responseObserver.onError((Exception) response);
+            } else {
+              responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
+            }
+          }
+
+          @Override
+          public void onError(Throwable t) {
+            responseObserver.onError(t);
+          }
+
+          @Override
+          public void onCompleted() {
+            responseObserver.onCompleted();
+          }
+        };
+    return requestObserver;
+  }
+
   @Override
   public void modifyPushConfig(
       ModifyPushConfigRequest request, StreamObserver responseObserver) {
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherTest.java
index 45066d67c31b..dda2496c08ab 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/PublisherTest.java
@@ -345,7 +345,7 @@ public void setIamPolicyTest() {
     Policy expectedResponse = Policy.newBuilder().setVersion(version).setEtag(etag).build();
     mockIAMPolicy.addResponse(expectedResponse);
 
-    String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+    String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
     Policy policy = Policy.newBuilder().build();
 
     Policy actualResponse = client.setIamPolicy(formattedResource, policy);
@@ -366,7 +366,7 @@ public void setIamPolicyExceptionTest() throws Exception {
     mockIAMPolicy.addException(exception);
 
     try {
-      String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+      String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
       Policy policy = Policy.newBuilder().build();
 
       client.setIamPolicy(formattedResource, policy);
@@ -384,7 +384,7 @@ public void getIamPolicyTest() {
     Policy expectedResponse = Policy.newBuilder().setVersion(version).setEtag(etag).build();
     mockIAMPolicy.addResponse(expectedResponse);
 
-    String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+    String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
 
     Policy actualResponse = client.getIamPolicy(formattedResource);
     Assert.assertEquals(expectedResponse, actualResponse);
@@ -403,7 +403,7 @@ public void getIamPolicyExceptionTest() throws Exception {
     mockIAMPolicy.addException(exception);
 
     try {
-      String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+      String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
 
       client.getIamPolicy(formattedResource);
       Assert.fail("No exception raised");
@@ -418,7 +418,7 @@ public void testIamPermissionsTest() {
     TestIamPermissionsResponse expectedResponse = TestIamPermissionsResponse.newBuilder().build();
     mockIAMPolicy.addResponse(expectedResponse);
 
-    String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+    String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
     List permissions = new ArrayList<>();
 
     TestIamPermissionsResponse actualResponse =
@@ -440,7 +440,7 @@ public void testIamPermissionsExceptionTest() throws Exception {
     mockIAMPolicy.addException(exception);
 
     try {
-      String formattedResource = PublisherClient.formatTopicName("[PROJECT]", "[TOPIC]");
+      String formattedResource = TopicName.create("[PROJECT]", "[TOPIC]").toString();
       List permissions = new ArrayList<>();
 
       client.testIamPermissions(formattedResource, permissions);
diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java
index 7da01dd91701..9b7597bfcff7 100644
--- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java
+++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/spi/v1/SubscriberTest.java
@@ -18,8 +18,10 @@
 import static com.google.cloud.pubsub.spi.v1.PagedResponseWrappers.ListSubscriptionsPagedResponse;
 
 import com.google.api.gax.grpc.ApiException;
+import com.google.api.gax.grpc.StreamingCallable;
 import com.google.api.gax.testing.MockGrpcService;
 import com.google.api.gax.testing.MockServiceHelper;
+import com.google.api.gax.testing.MockStreamObserver;
 import com.google.common.collect.Lists;
 import com.google.iam.v1.GetIamPolicyRequest;
 import com.google.iam.v1.Policy;
@@ -40,16 +42,20 @@
 import com.google.pubsub.v1.PullRequest;
 import com.google.pubsub.v1.PullResponse;
 import com.google.pubsub.v1.PushConfig;
+import com.google.pubsub.v1.StreamingPullRequest;
+import com.google.pubsub.v1.StreamingPullResponse;
 import com.google.pubsub.v1.Subscription;
 import com.google.pubsub.v1.SubscriptionName;
 import com.google.pubsub.v1.TopicName;
 import com.google.pubsub.v1.TopicNameOneof;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -383,6 +389,66 @@ public void pullExceptionTest() throws Exception {
     }
   }
 
+  @Test
+  @SuppressWarnings("all")
+  public void streamingPullTest() throws Exception {
+    StreamingPullResponse expectedResponse = StreamingPullResponse.newBuilder().build();
+    mockSubscriber.addResponse(expectedResponse);
+    SubscriptionName subscription = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]");
+    int streamAckDeadlineSeconds = 1875467245;
+    StreamingPullRequest request =
+        StreamingPullRequest.newBuilder()
+            .setSubscriptionWithSubscriptionName(subscription)
+            .setStreamAckDeadlineSeconds(streamAckDeadlineSeconds)
+            .build();
+
+    MockStreamObserver responseObserver = new MockStreamObserver<>();
+
+    StreamingCallable callable =
+        client.streamingPullCallable();
+    StreamObserver requestObserver =
+        callable.bidiStreamingCall(responseObserver);
+
+    requestObserver.onNext(request);
+    requestObserver.onCompleted();
+
+    List actualResponses = responseObserver.future().get();
+    Assert.assertEquals(1, actualResponses.size());
+    Assert.assertEquals(expectedResponse, actualResponses.get(0));
+  }
+
+  @Test
+  @SuppressWarnings("all")
+  public void streamingPullExceptionTest() throws Exception {
+    StatusRuntimeException exception = new StatusRuntimeException(Status.INTERNAL);
+    mockSubscriber.addException(exception);
+    SubscriptionName subscription = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]");
+    int streamAckDeadlineSeconds = 1875467245;
+    StreamingPullRequest request =
+        StreamingPullRequest.newBuilder()
+            .setSubscriptionWithSubscriptionName(subscription)
+            .setStreamAckDeadlineSeconds(streamAckDeadlineSeconds)
+            .build();
+
+    MockStreamObserver responseObserver = new MockStreamObserver<>();
+
+    StreamingCallable callable =
+        client.streamingPullCallable();
+    StreamObserver requestObserver =
+        callable.bidiStreamingCall(responseObserver);
+
+    requestObserver.onNext(request);
+
+    try {
+      List actualResponses = responseObserver.future().get();
+      Assert.fail("No exception thrown");
+    } catch (ExecutionException e) {
+      Assert.assertTrue(e.getCause() instanceof StatusRuntimeException);
+      StatusRuntimeException statusException = (StatusRuntimeException) e.getCause();
+      Assert.assertEquals(Status.INTERNAL, statusException.getStatus());
+    }
+  }
+
   @Test
   @SuppressWarnings("all")
   public void modifyPushConfigTest() {
@@ -427,8 +493,7 @@ public void setIamPolicyTest() {
     Policy expectedResponse = Policy.newBuilder().setVersion(version).setEtag(etag).build();
     mockIAMPolicy.addResponse(expectedResponse);
 
-    String formattedResource =
-        SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+    String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
     Policy policy = Policy.newBuilder().build();
 
     Policy actualResponse = client.setIamPolicy(formattedResource, policy);
@@ -449,8 +514,7 @@ public void setIamPolicyExceptionTest() throws Exception {
     mockIAMPolicy.addException(exception);
 
     try {
-      String formattedResource =
-          SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+      String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
       Policy policy = Policy.newBuilder().build();
 
       client.setIamPolicy(formattedResource, policy);
@@ -468,8 +532,7 @@ public void getIamPolicyTest() {
     Policy expectedResponse = Policy.newBuilder().setVersion(version).setEtag(etag).build();
     mockIAMPolicy.addResponse(expectedResponse);
 
-    String formattedResource =
-        SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+    String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
 
     Policy actualResponse = client.getIamPolicy(formattedResource);
     Assert.assertEquals(expectedResponse, actualResponse);
@@ -488,8 +551,7 @@ public void getIamPolicyExceptionTest() throws Exception {
     mockIAMPolicy.addException(exception);
 
     try {
-      String formattedResource =
-          SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+      String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
 
       client.getIamPolicy(formattedResource);
       Assert.fail("No exception raised");
@@ -504,8 +566,7 @@ public void testIamPermissionsTest() {
     TestIamPermissionsResponse expectedResponse = TestIamPermissionsResponse.newBuilder().build();
     mockIAMPolicy.addResponse(expectedResponse);
 
-    String formattedResource =
-        SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+    String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
     List permissions = new ArrayList<>();
 
     TestIamPermissionsResponse actualResponse =
@@ -527,8 +588,7 @@ public void testIamPermissionsExceptionTest() throws Exception {
     mockIAMPolicy.addException(exception);
 
     try {
-      String formattedResource =
-          SubscriberClient.formatSubscriptionName("[PROJECT]", "[SUBSCRIPTION]");
+      String formattedResource = SubscriptionName.create("[PROJECT]", "[SUBSCRIPTION]").toString();
       List permissions = new ArrayList<>();
 
       client.testIamPermissions(formattedResource, permissions);