Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
Expand Down Expand Up @@ -708,6 +709,7 @@ void testConcurrentResetCursor() throws Exception {
final int Consumers = 5;

List<Future<AtomicBoolean>> futures = Lists.newArrayList();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
final CyclicBarrier barrier = new CyclicBarrier(Consumers + 1);

Expand Down Expand Up @@ -1727,6 +1729,7 @@ void testReadEntriesOrWaitBlocking() throws Exception {
final int Consumers = 10;

List<Future<Void>> futures = Lists.newArrayList();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
final CyclicBarrier barrier = new CyclicBarrier(Consumers + 1);

Expand Down Expand Up @@ -3465,7 +3468,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {

} finally {
factory2.shutdown();
}
}
});

factory1.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import lombok.Cleanup;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -1271,6 +1272,7 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
public void testConcurrentAsyncSetProperties() throws Exception {
final CountDownLatch latch = new CountDownLatch(1000);
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 1000; i++) {
final int finalI = i;
Expand Down Expand Up @@ -1302,7 +1304,6 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
fail(e.getMessage());
}
assertTrue(latch.await(300, TimeUnit.SECONDS));
executor.shutdown();
factory.shutdown();
}

Expand Down Expand Up @@ -1447,7 +1448,8 @@ public void testOpenRaceCondition() throws Exception {

final int N = 1000;
final Position position = ledger.addEntry("entry-0".getBytes());
Executor executor = Executors.newCachedThreadPool();
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
final CountDownLatch counter = new CountDownLatch(2);
executor.execute(() -> {
try {
Expand Down Expand Up @@ -2290,6 +2292,7 @@ public void testLazyRecoverCursor() throws Exception {

// Simulating time consuming cursor recovery.
CompletableFuture<Void> future = bkc.promiseAfter(2);
@Cleanup("shutdownNow")
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lazyCursorRecovery"));
scheduledExecutorService.schedule(() -> {
future.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void tearDown() throws Exception {
stopBKCluster();
// stop zookeeper service
stopZKCluster();
executor.shutdown();
executor.shutdownNow();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ public final void setUpClass() {
@AfterClass(alwaysRun = true)
public final void tearDownClass() {
if (executor != null) {
executor.shutdown();
executor.shutdownNow();
}
if (cachedExecutor != null) {
cachedExecutor.shutdown();
cachedExecutor.shutdownNow();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import lombok.Cleanup;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
import org.apache.zookeeper.ZooKeeper.States;
import org.slf4j.ILoggerFactory;
Expand All @@ -52,6 +53,7 @@ public void run() {
+ service.getSafeWebServiceAddress() + ", broker url=" + service.getSafeBrokerServiceUrl());
}

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("shutdown-thread"));

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,11 @@ public CompletableFuture<Void> closeAsync() {
transactionBufferClient.close();
}

if (transactionExecutor != null) {
transactionExecutor.shutdown();
transactionExecutor = null;
}

if (coordinationService != null) {
coordinationService.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ public void close() {
mockedBk.close();
} catch (BKException | InterruptedException ignored) {
}
executor.shutdown();
executor.shutdownNow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void createTenant(PulsarAdmin pulsarAdmin)
@AfterClass(alwaysRun = true)
public void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdown();
executor.shutdownNow();
executor = null;

for (int i = 0; i < BROKER_COUNT; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1346,6 +1346,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
assertEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
}

@Cleanup("shutdownNow")
ExecutorService executorService = Executors.newCachedThreadPool();

try {
Expand Down Expand Up @@ -1403,7 +1404,6 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
}

producer.close();
executorService.shutdown();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
assertEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
}

@Cleanup("shutdownNow")
ExecutorService executorService = Executors.newCachedThreadPool();


Expand Down Expand Up @@ -1083,7 +1084,6 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
}

producer.close();
executorService.shutdownNow();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void setup() throws Exception {

@AfterMethod(alwaysRun = true)
public void teardown() throws Exception{
executor.shutdown();
executor.shutdownNow();
zkCache.stop();
zkc.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ void setup() throws Exception {

@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
executor.shutdown();
executor.shutdownNow();

admin1.close();
admin2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ void setup() throws Exception {
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdown();
executor.shutdownNow();

for (int i = 0; i < BROKER_COUNT; i++) {
pulsarAdmins[i].close();
Expand Down Expand Up @@ -609,7 +609,7 @@ private BundlesData getBundles(int numBundles) {
private void createNamespace(PulsarService pulsar, String namespace, int numBundles) throws Exception {
Policies policies = new Policies();
policies.bundles = getBundles(numBundles);
String zpath = AdminResource.path(POLICIES, namespace);
String zpath = AdminResource.path(POLICIES, namespace);
pulsar.getPulsarResources().getNamespaceResources().create(zpath, policies);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ void setup() throws Exception {
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdown();
executor.shutdownNow();

admin1.close();
admin2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ void setup() throws Exception {
@AfterMethod(alwaysRun = true)
void shutdown() throws Exception {
log.info("--- Shutting down ---");
executor.shutdown();
executor.shutdownNow();

admin1.close();
admin2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void setup() throws Exception {

@AfterMethod(alwaysRun = true)
public void teardown() throws Exception {
executor.shutdown();
executor.shutdownNow();
zkCache.stop();
zkc.close();
otherZkc.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws Excepti
retryStrategically((test) -> dispatcher.getConsumers().get(0).getUnackedMessages() == 0, 50, 150);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);

executor.shutdown();
executor.shutdownNow();
myConsumer.close();
producer.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@ public void testTopicFailureShouldNotHaveDeadLock() {
fail(e.getMessage());
}

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
BrokerService service = spy(pulsar.getBrokerService());
// create topic will fail to get managedLedgerConfig
Expand All @@ -873,8 +874,6 @@ public void testTopicFailureShouldNotHaveDeadLock() {
fail("there is a dead-lock and it should have been prevented");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof NullPointerException);
} finally {
executor.shutdownNow();
}
}

Expand All @@ -892,6 +891,7 @@ public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception {
fail(e.getMessage());
}

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newSingleThreadExecutor();
BrokerService service = spy(pulsar.getBrokerService());
// create topic will fail to get managedLedgerConfig
Expand Down Expand Up @@ -926,7 +926,6 @@ public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception {
} catch (ExecutionException e) {
assertEquals(e.getCause().getClass(), PersistenceException.class);
} finally {
executor.shutdownNow();
ledgers.clear();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
Expand Down Expand Up @@ -146,6 +147,7 @@ public void testLookupThrottlingForClientByBroker() throws Exception {
}

List<Consumer<byte[]>> successfulConsumers = Collections.synchronizedList(Lists.newArrayList());
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(10);
final int totalConsumers = 20;
CountDownLatch latch = new CountDownLatch(totalConsumers);
Expand All @@ -170,7 +172,6 @@ public void testLookupThrottlingForClientByBroker() throws Exception {
}
}
pulsarClient.close();
executor.shutdown();
assertNotEquals(successfulConsumers.size(), totalConsumers);
}

Expand Down Expand Up @@ -198,6 +199,7 @@ public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exceptio
.ioThreads(20).connectionsPerBroker(20).build();
upsertLookupPermits(100);
List<Consumer<byte[]>> consumers = Collections.synchronizedList(Lists.newArrayList());
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(10);
final int totalConsumers = 8;
CountDownLatch latch = new CountDownLatch(totalConsumers);
Expand Down Expand Up @@ -231,8 +233,6 @@ public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exceptio

}
assertEquals(totalConnectedConsumers, totalConsumers);

executor.shutdown();
pulsarClient.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.Cleanup;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
Expand Down Expand Up @@ -82,6 +83,7 @@ public void concurrent() throws Exception {

CyclicBarrier barrier = new CyclicBarrier(Threads);
CountDownLatch counter = new CountDownLatch(Threads);
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();

List<String> results = Collections.synchronizedList(Lists.newArrayList());
Expand Down Expand Up @@ -112,8 +114,6 @@ public void concurrent() throws Exception {
// Check the list contains no duplicates
Set<String> set = Sets.newHashSet(results);
assertEquals(set.size(), results.size());

executor.shutdown();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ public void testConcurrentConsumerThreads() throws Exception {
final int recvQueueSize = 100;
final int numConsumersThreads = 10;

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();

final CyclicBarrier barrier = new CyclicBarrier(numConsumersThreads + 1);
Expand Down Expand Up @@ -376,7 +377,6 @@ public Void call() throws Exception {
// 2. flow control works the same as single consumer single thread
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertEquals(getAvailablePermits(subRef), recvQueueSize);
executor.shutdown();
}

@Test(enabled = false)
Expand All @@ -395,6 +395,7 @@ public void testGracefulClose() throws Exception {
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
assertNotNull(topicRef);

@Cleanup("shutdownNow")
ExecutorService executor = Executors.newCachedThreadPool();
CountDownLatch latch = new CountDownLatch(1);
executor.submit(() -> {
Expand Down Expand Up @@ -442,8 +443,6 @@ public void testGracefulClose() throws Exception {
consumer.close();
Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT);
assertTrue(subRef.getDispatcher().isConsumerConnected());

executor.shutdown();
}

@Test
Expand Down Expand Up @@ -1398,7 +1397,7 @@ public void testBrokerTopicStats() throws Exception {
field.setAccessible(true);
ScheduledExecutorService statsUpdater = (ScheduledExecutorService) field.get(brokerService);
// disable statsUpdate to calculate rates explicitly
statsUpdater.shutdown();
statsUpdater.shutdownNow();

final String namespace = "prop/ns-abc";
Producer<byte[]> producer = pulsarClient.newProducer()
Expand Down
Loading