diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 5775278dcb6c..a3cf1ae98bc4 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -26,8 +26,10 @@ import com.google.api.gax.batching.FlowControlSettings; import com.google.api.gax.batching.FlowController; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; +import com.google.api.gax.core.BackgroundResource; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.Distribution; +import com.google.api.gax.core.ExecutorAsBackgroundResource; import com.google.api.gax.core.ExecutorProvider; import com.google.api.gax.core.InstantiatingExecutorProvider; import com.google.api.gax.rpc.HeaderProvider; @@ -117,7 +119,7 @@ public class Subscriber extends AbstractApiService { private final MessageReceiver receiver; private final List streamingSubscriberConnections; private final ApiClock clock; - private final List closeables = new ArrayList<>(); + private final List backgroundResources = new ArrayList<>(); private Subscriber(Builder builder) { receiver = builder.receiver; @@ -143,13 +145,7 @@ private Subscriber(Builder builder) { alarmsExecutor = systemExecutorProvider.getExecutor(); if (systemExecutorProvider.shouldAutoClose()) { - closeables.add( - new AutoCloseable() { - @Override - public void close() { - alarmsExecutor.shutdown(); - } - }); + backgroundResources.add(new ExecutorAsBackgroundResource((alarmsExecutor))); } TransportChannelProvider channelProvider = builder.channelProvider; @@ -298,8 +294,8 @@ public void run() { try { // stop connection is no-op if connections haven't been started. stopAllStreamingConnections(); - for (AutoCloseable closeable : closeables) { - closeable.close(); + for (BackgroundResource resource : backgroundResources) { + resource.shutdown(); } notifyStopped(); } catch (Exception e) { @@ -315,13 +311,7 @@ private void startStreamingConnections() { for (int i = 0; i < numPullers; i++) { final ScheduledExecutorService executor = executorProvider.getExecutor(); if (executorProvider.shouldAutoClose()) { - closeables.add( - new AutoCloseable() { - @Override - public void close() { - executor.shutdown(); - } - }); + backgroundResources.add(new ExecutorAsBackgroundResource((executor))); } streamingSubscriberConnections.add(