diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index aad9446f06eb..bb50e18813de 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -17,16 +17,16 @@ package com.google.cloud.pubsub.v1; import com.google.api.core.ApiClock; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.api.core.InternalApi; +import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; import com.google.api.gax.batching.FlowController.FlowControlException; import com.google.api.gax.core.Distribution; import com.google.cloud.pubsub.v1.MessageDispatcher.OutstandingMessageBatch.OutstandingMessage; -import com.google.common.collect.ArrayListMultimap; import com.google.common.primitives.Ints; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.pubsub.v1.PubsubMessage; import com.google.pubsub.v1.ReceivedMessage; import java.util.ArrayList; @@ -129,7 +129,7 @@ public enum AckReply { } /** Handles callbacks for acking/nacking messages from the {@link MessageReceiver}. */ - private class AckHandler implements FutureCallback { + private class AckHandler implements ApiFutureCallback { private final String ackId; private final int outstandingBytes; private final long receivedTimeMillis; @@ -379,7 +379,7 @@ public void processOutstandingBatches() { final PubsubMessage message = outstandingMessage.receivedMessage().getMessage(); final AckHandler ackHandler = outstandingMessage.ackHandler(); - final SettableFuture response = SettableFuture.create(); + final SettableApiFuture response = SettableApiFuture.create(); final AckReplyConsumer consumer = new AckReplyConsumer() { @Override @@ -392,7 +392,7 @@ public void nack() { response.set(AckReply.NACK); } }; - Futures.addCallback(response, ackHandler); + ApiFutures.addCallback(response, ackHandler, MoreExecutors.directExecutor()); executor.execute( new Runnable() { @Override diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index b9854fe5f683..17a913935392 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -18,7 +18,10 @@ import com.google.api.core.AbstractApiService; import com.google.api.core.ApiClock; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; import com.google.api.core.InternalApi; +import com.google.api.core.SettableApiFuture; import com.google.api.gax.batching.FlowController; import com.google.api.gax.core.Distribution; import com.google.api.gax.grpc.GrpcStatusCode; @@ -27,9 +30,7 @@ import com.google.cloud.pubsub.v1.MessageDispatcher.AckProcessor; import com.google.cloud.pubsub.v1.MessageDispatcher.PendingModifyAckDeadline; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.Empty; import com.google.pubsub.v1.AcknowledgeRequest; import com.google.pubsub.v1.ModifyAckDeadlineRequest; @@ -127,7 +128,7 @@ protected void doStop() { private class StreamingPullResponseObserver implements ClientResponseObserver { - final SettableFuture errorFuture; + final SettableApiFuture errorFuture; /** * When a batch finsihes processing, we want to request one more batch from the server. But by @@ -138,7 +139,7 @@ private class StreamingPullResponseObserver */ ClientCallStreamObserver thisRequestObserver; - StreamingPullResponseObserver(SettableFuture errorFuture) { + StreamingPullResponseObserver(SettableApiFuture errorFuture) { this.errorFuture = errorFuture; } @@ -186,7 +187,7 @@ public void onCompleted() { } private void initialize() { - final SettableFuture errorFuture = SettableFuture.create(); + final SettableApiFuture errorFuture = SettableApiFuture.create(); final ClientResponseObserver responseObserver = new StreamingPullResponseObserver(errorFuture); final ClientCallStreamObserver requestObserver = @@ -215,9 +216,9 @@ private void initialize() { lock.unlock(); } - Futures.addCallback( + ApiFutures.addCallback( errorFuture, - new FutureCallback() { + new ApiFutureCallback() { @Override public void onSuccess(@Nullable Void result) { if (!isAlive()) { @@ -260,7 +261,8 @@ public void run() { backoffMillis, TimeUnit.MILLISECONDS); } - }); + }, + MoreExecutors.directExecutor()); } private boolean isAlive() {