diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 3400c259e950..ad622833c4b6 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -104,10 +104,6 @@ private PendingModifyAckDeadline(int deadlineExtensionSeconds, Collection { private final long receivedTimeMillis; private final Instant totalExpiration; - AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) { + private AckHandler(String ackId, int outstandingBytes, Instant totalExpiration) { this.ackId = ackId; this.outstandingBytes = outstandingBytes; this.receivedTimeMillis = clock.millisTime(); @@ -182,7 +178,7 @@ public void onSuccess(AckReply reply) { } } - public interface AckProcessor { + interface AckProcessor { void sendAckOperations( List acksToSend, List ackDeadlineExtensions); } @@ -211,7 +207,7 @@ void sendAckOperations( this.clock = clock; } - public void start() { + void start() { final Runnable setExtendDeadline = new Runnable() { @Override @@ -264,7 +260,7 @@ public void run() { } } - public void stop() { + void stop() { messagesWaiter.waitNoMessages(); jobLock.lock(); try { @@ -288,17 +284,17 @@ int getMessageDeadlineSeconds() { return messageDeadlineSeconds.get(); } - static class OutstandingMessage { + private static class OutstandingMessage { private final ReceivedMessage receivedMessage; private final AckHandler ackHandler; - public OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { + private OutstandingMessage(ReceivedMessage receivedMessage, AckHandler ackHandler) { this.receivedMessage = receivedMessage; this.ackHandler = ackHandler; } } - public void processReceivedMessages(List messages) { + void processReceivedMessages(List messages) { Instant totalExpiration = now().plus(maxAckExtensionPeriod); List outstandingBatch = new ArrayList<>(messages.size()); for (ReceivedMessage message : messages) { diff --git a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java index c4a0651da6fa..cb238f3d34ca 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java +++ b/google-cloud-clients/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageWaiter.java @@ -17,7 +17,6 @@ package com.google.cloud.pubsub.v1; import com.google.api.core.InternalApi; -import java.util.concurrent.atomic.AtomicBoolean; /** A barrier kind of object that helps to keep track and synchronously wait on pending messages. */ class MessageWaiter { @@ -35,16 +34,10 @@ public synchronized void incrementPendingMessages(int messages) { } public synchronized void waitNoMessages() { - waitNoMessages(new AtomicBoolean()); - } - - @InternalApi - synchronized void waitNoMessages(AtomicBoolean waitReached) { boolean interrupted = false; try { while (pendingMessages > 0) { try { - waitReached.set(true); wait(); } catch (InterruptedException e) { // Ignored, uninterruptibly. diff --git a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java index 5588ebec59e1..5f8e19875e42 100644 --- a/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java +++ b/google-cloud-clients/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageWaiterTest.java @@ -18,7 +18,6 @@ import static org.junit.Assert.assertEquals; -import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -32,14 +31,13 @@ public void test() throws Exception { final MessageWaiter waiter = new MessageWaiter(); waiter.incrementPendingMessages(1); - final AtomicBoolean waitReached = new AtomicBoolean(); - + final Thread mainThread = Thread.currentThread(); Thread t = new Thread( new Runnable() { @Override public void run() { - while (!waitReached.get()) { + while (mainThread.getState() != Thread.State.WAITING) { Thread.yield(); } waiter.incrementPendingMessages(-1); @@ -47,7 +45,7 @@ public void run() { }); t.start(); - waiter.waitNoMessages(waitReached); + waiter.waitNoMessages(); t.join(); assertEquals(0, waiter.pendingMessages());