From 7cbd1e832fd274feac1d5875daabbe2471395393 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 13 Apr 2021 13:23:50 +0300 Subject: [PATCH] Fix some ExecutorService leaks and use @Cleanup("shutdownNow") for cleanup - use ExecutorService.shutdownNow() instead of ExecutorService.shutdown() in tests --- .../mledger/impl/ManagedCursorTest.java | 5 +- .../mledger/impl/ManagedLedgerTest.java | 7 +- .../test/BookKeeperClusterTestCase.java | 2 +- .../test/MockedBookKeeperTestCase.java | 4 +- .../broker/MessagingServiceShutdownHook.java | 2 + .../apache/pulsar/broker/PulsarService.java | 5 + .../broker/MockedBookKeeperClientFactory.java | 2 +- .../pulsar/broker/SLAMonitoringTest.java | 2 +- .../pulsar/broker/admin/AdminApiTest.java | 2 +- .../broker/admin/v1/V1_AdminApiTest.java | 2 +- .../broker/cache/ResourceQuotaCacheTest.java | 2 +- .../AntiAffinityNamespaceGroupTest.java | 2 +- .../broker/loadbalance/LoadBalancerTest.java | 4 +- .../ModularLoadManagerImplTest.java | 2 +- .../SimpleLoadManagerImplTest.java | 2 +- .../broker/namespace/OwnershipCacheTest.java | 2 +- .../broker/service/BatchMessageTest.java | 2 +- .../broker/service/BrokerServiceTest.java | 5 +- .../service/BrokerServiceThrottlingTest.java | 6 +- .../service/DistributedIdGeneratorTest.java | 4 +- .../service/PersistentTopicE2ETest.java | 7 +- .../broker/service/PersistentTopicTest.java | 5 +- .../pulsar/broker/service/ReplicatorTest.java | 3 +- .../broker/service/ReplicatorTestBase.java | 2 +- .../zookeeper/ZooKeeperClientAspectJTest.java | 6 +- .../client/api/DeadLetterTopicTest.java | 3 +- .../api/DispatcherBlockConsumerTest.java | 5 +- .../client/api/NonPersistentTopicTest.java | 7 +- .../api/PartitionedProducerConsumerTest.java | 2 +- .../api/SimpleProducerConsumerTest.java | 6 +- .../api/v1/V1_ProducerConsumerTest.java | 8 +- .../impl/BrokerClientIntegrationTest.java | 4 +- .../client/impl/MessageChunkingTest.java | 70 +++--- .../impl/MessagePublishThrottlingTest.java | 3 +- .../client/impl/MessageRedeliveryTest.java | 223 +++++++++--------- .../client/impl/TopicsConsumerImplTest.java | 3 +- .../pulsar/io/PulsarFunctionE2ETest.java | 5 +- .../apache/pulsar/io/PulsarSinkE2ETest.java | 4 +- .../proxy/ProxyAuthenticationTest.java | 3 +- .../proxy/ProxyPublishConsumeTest.java | 3 +- .../proxy/ProxyPublishConsumeTlsTest.java | 3 +- .../ProxyPublishConsumeWithoutZKTest.java | 3 +- .../proxy/v1/V1_ProxyAuthenticationTest.java | 3 +- .../pulsar/admin/cli/utils/IOUtilsTest.java | 3 + .../client/cli/PulsarClientToolTest.java | 10 +- .../client/cli/PulsarClientToolWsTest.java | 121 +++++----- .../pulsar/common/util/FutureUtilTest.java | 18 +- .../ConcurrentLongHashMapTest.java | 9 +- .../ConcurrentLongPairSetTest.java | 9 +- .../ConcurrentOpenHashMapTest.java | 7 +- .../ConcurrentOpenHashSetTest.java | 7 +- .../ConcurrentSortedLongPairSetTest.java | 4 +- .../GrowablePriorityLongPairQueueTest.java | 7 +- .../functions/instance/JavaInstance.java | 4 +- .../functions/instance/JavaInstanceTest.java | 5 +- .../pulsar/functions/worker/Worker.java | 8 +- .../worker/SchedulerManagerTest.java | 4 +- .../test/BookKeeperClusterTestCase.java | 2 +- .../test/MockedBookKeeperTestCase.java | 4 +- .../test/MockedBookKeeperTestCase.java | 4 +- .../ZookeeperBkClientFactoryImplTest.java | 2 +- .../pulsar/zookeeper/ZookeeperCacheTest.java | 30 ++- 62 files changed, 362 insertions(+), 341 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 1d325f6c03d39..f706f3ac2562c 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -60,6 +60,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import lombok.Cleanup; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; @@ -708,6 +709,7 @@ void testConcurrentResetCursor() throws Exception { final int Consumers = 5; List> futures = Lists.newArrayList(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final CyclicBarrier barrier = new CyclicBarrier(Consumers + 1); @@ -1727,6 +1729,7 @@ void testReadEntriesOrWaitBlocking() throws Exception { final int Consumers = 10; List> futures = Lists.newArrayList(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final CyclicBarrier barrier = new CyclicBarrier(Consumers + 1); @@ -3465,7 +3468,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) { } finally { factory2.shutdown(); - } + } }); factory1.shutdown(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index f6262487b1659..25a031ba42874 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -63,6 +63,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; +import lombok.Cleanup; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; @@ -1271,6 +1272,7 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) public void testConcurrentAsyncSetProperties() throws Exception { final CountDownLatch latch = new CountDownLatch(1000); ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1)); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 1000; i++) { final int finalI = i; @@ -1302,7 +1304,6 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) fail(e.getMessage()); } assertTrue(latch.await(300, TimeUnit.SECONDS)); - executor.shutdown(); factory.shutdown(); } @@ -1447,7 +1448,8 @@ public void testOpenRaceCondition() throws Exception { final int N = 1000; final Position position = ledger.addEntry("entry-0".getBytes()); - Executor executor = Executors.newCachedThreadPool(); + @Cleanup("shutdownNow") + ExecutorService executor = Executors.newCachedThreadPool(); final CountDownLatch counter = new CountDownLatch(2); executor.execute(() -> { try { @@ -2290,6 +2292,7 @@ public void testLazyRecoverCursor() throws Exception { // Simulating time consuming cursor recovery. CompletableFuture future = bkc.promiseAfter(2); + @Cleanup("shutdownNow") ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lazyCursorRecovery")); scheduledExecutorService.schedule(() -> { future.complete(null); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java index ff0120310aa74..b64aaa794babb 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java @@ -126,7 +126,7 @@ public void tearDown() throws Exception { stopBKCluster(); // stop zookeeper service stopZKCluster(); - executor.shutdown(); + executor.shutdownNow(); } /** diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java index a2ad8459472b3..a424b8a977406 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java @@ -119,10 +119,10 @@ public final void setUpClass() { @AfterClass(alwaysRun = true) public final void tearDownClass() { if (executor != null) { - executor.shutdown(); + executor.shutdownNow(); } if (cachedExecutor != null) { - cachedExecutor.shutdown(); + cachedExecutor.shutdownNow(); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java index 319628db903cd..0addf1ee73aae 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.Consumer; +import lombok.Cleanup; import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService; import org.apache.zookeeper.ZooKeeper.States; import org.slf4j.ILoggerFactory; @@ -52,6 +53,7 @@ public void run() { + service.getSafeWebServiceAddress() + ", broker url=" + service.getSafeBrokerServiceUrl()); } + @Cleanup("shutdownNow") ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("shutdown-thread")); try { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 7235aef7366ac..4a000e471fdfc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -436,6 +436,11 @@ public CompletableFuture closeAsync() { transactionBufferClient.close(); } + if (transactionExecutor != null) { + transactionExecutor.shutdown(); + transactionExecutor = null; + } + if (coordinationService != null) { coordinationService.close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java index 67fd80de6baa6..2ce299507403e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java @@ -74,6 +74,6 @@ public void close() { mockedBk.close(); } catch (BKException | InterruptedException ignored) { } - executor.shutdown(); + executor.shutdownNow(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java index 52be0fb40ccb6..7e09ab9b493e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java @@ -126,7 +126,7 @@ private void createTenant(PulsarAdmin pulsarAdmin) @AfterClass(alwaysRun = true) public void shutdown() throws Exception { log.info("--- Shutting down ---"); - executor.shutdown(); + executor.shutdownNow(); executor = null; for (int i = 0; i < BROKER_COUNT; i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index c87edbb62b4cb..622c4dc9ed338 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -1346,6 +1346,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { assertEquals(bundles.getBundles().get(i).toString(), splitRange[i]); } + @Cleanup("shutdownNow") ExecutorService executorService = Executors.newCachedThreadPool(); try { @@ -1403,7 +1404,6 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { } producer.close(); - executorService.shutdown(); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 297e87d418d19..00e7e46bef355 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -999,6 +999,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { assertEquals(bundles.getBundles().get(i).toString(), splitRange[i]); } + @Cleanup("shutdownNow") ExecutorService executorService = Executors.newCachedThreadPool(); @@ -1083,7 +1084,6 @@ public void testNamespaceSplitBundleConcurrent() throws Exception { } producer.close(); - executorService.shutdownNow(); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java index a01bfb1824bc5..9cd6d3e2fe350 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/cache/ResourceQuotaCacheTest.java @@ -74,7 +74,7 @@ public void setup() throws Exception { @AfterMethod(alwaysRun = true) public void teardown() throws Exception{ - executor.shutdown(); + executor.shutdownNow(); zkCache.stop(); zkc.shutdown(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java index a258326c6bb2e..963ce6fa648a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AntiAffinityNamespaceGroupTest.java @@ -151,7 +151,7 @@ void setup() throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { - executor.shutdown(); + executor.shutdownNow(); admin1.close(); admin2.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java index 6f73fc5c0c9c4..84750da3e21dc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/LoadBalancerTest.java @@ -164,7 +164,7 @@ void setup() throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - executor.shutdown(); + executor.shutdownNow(); for (int i = 0; i < BROKER_COUNT; i++) { pulsarAdmins[i].close(); @@ -609,7 +609,7 @@ private BundlesData getBundles(int numBundles) { private void createNamespace(PulsarService pulsar, String namespace, int numBundles) throws Exception { Policies policies = new Policies(); policies.bundles = getBundles(numBundles); - String zpath = AdminResource.path(POLICIES, namespace); + String zpath = AdminResource.path(POLICIES, namespace); pulsar.getPulsarResources().getNamespaceResources().create(zpath, policies); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java index d180695d7fe7b..3c6d0f4518200 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java @@ -183,7 +183,7 @@ void setup() throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - executor.shutdown(); + executor.shutdownNow(); admin1.close(); admin2.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java index 3fb15ee77875a..cced8104f35b3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java @@ -158,7 +158,7 @@ void setup() throws Exception { @AfterMethod(alwaysRun = true) void shutdown() throws Exception { log.info("--- Shutting down ---"); - executor.shutdown(); + executor.shutdownNow(); admin1.close(); admin2.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java index 1036ecd77aac9..c0a7e35d12c94 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnershipCacheTest.java @@ -120,7 +120,7 @@ public void setup() throws Exception { @AfterMethod(alwaysRun = true) public void teardown() throws Exception { - executor.shutdown(); + executor.shutdownNow(); zkCache.stop(); zkc.close(); otherZkc.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 9cbf754d7ce2a..d467d55df1cfd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -717,7 +717,7 @@ public void testConcurrentBatchMessageAck(BatcherBuilder builder) throws Excepti retryStrategically((test) -> dispatcher.getConsumers().get(0).getUnackedMessages() == 0, 50, 150); assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0); - executor.shutdown(); + executor.shutdownNow(); myConsumer.close(); producer.close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index 3d454b7b52ead..f318b4241ddef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -849,6 +849,7 @@ public void testTopicFailureShouldNotHaveDeadLock() { fail(e.getMessage()); } + @Cleanup("shutdownNow") ExecutorService executor = Executors.newSingleThreadExecutor(); BrokerService service = spy(pulsar.getBrokerService()); // create topic will fail to get managedLedgerConfig @@ -873,8 +874,6 @@ public void testTopicFailureShouldNotHaveDeadLock() { fail("there is a dead-lock and it should have been prevented"); } catch (ExecutionException e) { assertTrue(e.getCause() instanceof NullPointerException); - } finally { - executor.shutdownNow(); } } @@ -892,6 +891,7 @@ public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception { fail(e.getMessage()); } + @Cleanup("shutdownNow") ExecutorService executor = Executors.newSingleThreadExecutor(); BrokerService service = spy(pulsar.getBrokerService()); // create topic will fail to get managedLedgerConfig @@ -926,7 +926,6 @@ public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception { } catch (ExecutionException e) { assertEquals(e.getCause().getClass(), PersistenceException.class); } finally { - executor.shutdownNow(); ledgers.clear(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java index a2f206766401f..8c6ba360f76ca 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceThrottlingTest.java @@ -35,6 +35,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.PulsarClient; @@ -146,6 +147,7 @@ public void testLookupThrottlingForClientByBroker() throws Exception { } List> successfulConsumers = Collections.synchronizedList(Lists.newArrayList()); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(10); final int totalConsumers = 20; CountDownLatch latch = new CountDownLatch(totalConsumers); @@ -170,7 +172,6 @@ public void testLookupThrottlingForClientByBroker() throws Exception { } } pulsarClient.close(); - executor.shutdown(); assertNotEquals(successfulConsumers.size(), totalConsumers); } @@ -198,6 +199,7 @@ public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exceptio .ioThreads(20).connectionsPerBroker(20).build(); upsertLookupPermits(100); List> consumers = Collections.synchronizedList(Lists.newArrayList()); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(10); final int totalConsumers = 8; CountDownLatch latch = new CountDownLatch(totalConsumers); @@ -231,8 +233,6 @@ public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exceptio } assertEquals(totalConnectedConsumers, totalConsumers); - - executor.shutdown(); pulsarClient.close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DistributedIdGeneratorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DistributedIdGeneratorTest.java index 3af044fd62e4b..0ad5bfc4f717e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DistributedIdGeneratorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/DistributedIdGeneratorTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import lombok.Cleanup; import org.apache.pulsar.metadata.api.MetadataStoreConfig; import org.apache.pulsar.metadata.api.coordination.CoordinationService; import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; @@ -82,6 +83,7 @@ public void concurrent() throws Exception { CyclicBarrier barrier = new CyclicBarrier(Threads); CountDownLatch counter = new CountDownLatch(Threads); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); List results = Collections.synchronizedList(Lists.newArrayList()); @@ -112,8 +114,6 @@ public void concurrent() throws Exception { // Check the list contains no duplicates Set set = Sets.newHashSet(results); assertEquals(set.size(), results.size()); - - executor.shutdown(); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 2e55e3fa58dcd..302b78d59a32b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -334,6 +334,7 @@ public void testConcurrentConsumerThreads() throws Exception { final int recvQueueSize = 100; final int numConsumersThreads = 10; + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final CyclicBarrier barrier = new CyclicBarrier(numConsumersThreads + 1); @@ -376,7 +377,6 @@ public Void call() throws Exception { // 2. flow control works the same as single consumer single thread Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); assertEquals(getAvailablePermits(subRef), recvQueueSize); - executor.shutdown(); } @Test(enabled = false) @@ -395,6 +395,7 @@ public void testGracefulClose() throws Exception { PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(1); executor.submit(() -> { @@ -442,8 +443,6 @@ public void testGracefulClose() throws Exception { consumer.close(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); assertTrue(subRef.getDispatcher().isConsumerConnected()); - - executor.shutdown(); } @Test @@ -1398,7 +1397,7 @@ public void testBrokerTopicStats() throws Exception { field.setAccessible(true); ScheduledExecutorService statsUpdater = (ScheduledExecutorService) field.get(brokerService); // disable statsUpdate to calculate rates explicitly - statsUpdater.shutdown(); + statsUpdater.shutdownNow(); final String namespace = "prop/ns-abc"; Producer producer = pulsarClient.newProducer() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index e17fe33ed9a2f..0a98b94674bef 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -65,6 +65,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -906,6 +907,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { } }).when(ledgerMock).asyncDeleteCursor(matches(".*success.*"), any(DeleteCursorCallback.class), any()); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); executor.submit(() -> { @@ -920,7 +922,6 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { } catch (BrokerServiceException e) { assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException); } - executor.shutdown(); } @Test @@ -1168,6 +1169,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { } }).when(ledgerMock).asyncDelete(any(DeleteLedgerCallback.class), any()); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); executor.submit(() -> { @@ -1205,7 +1207,6 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { assertTrue(ee.getCause() instanceof BrokerServiceException.TopicFencedException); // Expected } - executor.shutdown(); } @SuppressWarnings("unchecked") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 7a48adbe03a36..7f05524e4e01b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -258,6 +258,7 @@ public void testConcurrentReplicator() throws Exception { replicationClients.put("r3", pulsarClient); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { executor.submit(() -> { @@ -274,8 +275,6 @@ public void testConcurrentReplicator() throws Exception { .createProducerAsync( Mockito.any(ProducerConfigurationData.class), Mockito.any(Schema.class), eq(null)); - - executor.shutdown(); } @DataProvider(name = "namespace") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index ee9377ba3491e..20b98e0884b0c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -245,7 +245,7 @@ protected void cleanup() throws Exception { markCurrentSetupNumberCleaned(); log.info("--- Shutting down ---"); if (executor != null) { - executor.shutdown(); + executor.shutdownNow(); executor = null; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java index a85159115fe3f..b5e59eed11835 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/zookeeper/ZooKeeperClientAspectJTest.java @@ -96,7 +96,7 @@ public void testZkConnected() throws Exception { localZkc.close(); } - executor.shutdown(); + executor.shutdownNow(); } } @@ -195,7 +195,7 @@ public void recordLatency(EventType eventType, long latencyMiliSecond) { localZkc.close(); } - executor.shutdown(); + executor.shutdownNow(); } } @@ -282,7 +282,7 @@ public void testZkOpStatsMetrics() throws Exception { localZkc.close(); } - executor.shutdown(); + executor.shutdownNow(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java index 18679de19e35d..c26fd4fce846b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; import org.apache.pulsar.client.util.RetryMessageUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -191,6 +192,7 @@ public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception { //1 start 3 parallel consumers List> consumers = new ArrayList<>(); final AtomicInteger totalReceived = new AtomicInteger(0); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(consumerCount); for (int i = 0; i < consumerCount; i++) { executor.execute(() -> { @@ -245,7 +247,6 @@ public void testDuplicatedMessageSendToDeadLetterTopic() throws Exception { assertEquals(totalInDeadLetter, messageCount); //6 clean up - executor.shutdownNow(); producer.close(); deadLetterConsumer.close(); for (Consumer consumer : consumers) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index b715b1028c71e..ed56901add0b7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -43,6 +43,7 @@ import java.util.stream.Collectors; +import lombok.Cleanup; import org.apache.pulsar.broker.namespace.NamespaceService; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; @@ -683,6 +684,7 @@ public void testBlockBrokerDispatching() { double unAckedMessagePercentage = pulsar.getConfiguration() .getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(); + @Cleanup("shutdownNow") ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); try { @@ -863,7 +865,6 @@ public void testBlockBrokerDispatching() { } finally { pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(unAckedMessages); pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(unAckedMessagePercentage); - executor.shutdownNow(); } } @@ -885,6 +886,7 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() { log.info("-- Starting {} test --", methodName); + @Cleanup("shutdownNow") ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); int unAckedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerBroker(); @@ -1032,7 +1034,6 @@ public void testBrokerDispatchBlockAndSubAckBackRequiredMsgs() { } finally { pulsar.getConfiguration().setMaxUnackedMessagesPerBroker(unAckedMessages); pulsar.getConfiguration().setMaxUnackedMessagesPerSubscriptionOnBrokerBlocked(unAckedMessagePercentage); - executor.shutdownNow(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index e416a3eea6b20..1a90f223bb7e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import lombok.Cleanup; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.LoadManager; @@ -347,6 +348,7 @@ public void testProducerRateLimit() throws Exception { stopBroker(); startBroker(); // produce message concurrently + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(5); AtomicBoolean failed = new AtomicBoolean(false); Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("subscriber-1") @@ -384,7 +386,6 @@ public void testProducerRateLimit() throws Exception { // but as message should be dropped at broker: broker should not receive the message assertNotEquals(messageSet.size(), totalProduceMessages); - executor.shutdown(); producer.close(); } finally { conf.setMaxConcurrentNonPersistentMessagePerConnection(defaultNonPersistentMessageRate); @@ -849,6 +850,7 @@ public void testMsgDropStat() throws Exception { .enableBatching(false) .messageRoutingMode(MessageRoutingMode.SinglePartition) .create(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(5); byte[] msgData = "testData".getBytes(); final int totalProduceMessages = 200; @@ -876,7 +878,6 @@ public void testMsgDropStat() throws Exception { producer.close(); consumer.close(); consumer2.close(); - executor.shutdown(); } finally { conf.setMaxConcurrentNonPersistentMessagePerConnection(defaultNonPersistentMessageRate); } @@ -1032,7 +1033,7 @@ private int inSec(int time, TimeUnit unit) { void shutdownReplicationCluster() throws Exception { log.info("--- Shutting down ---"); - executor.shutdown(); + executor.shutdownNow(); admin1.close(); admin2.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java index cb717f55c5297..e93adaeeaf147 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java @@ -68,7 +68,7 @@ protected void setup() throws Exception { @Override protected void cleanup() throws Exception { super.internalCleanup(); - executor.shutdown(); + executor.shutdownNow(); } @Test(timeOut = 30000) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 3250eea2ecfa3..061c4d4ba6db0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -750,6 +750,7 @@ public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs) .topic("persistent://my-property/my-ns/my-topic7").subscriptionName(subName) .startMessageIdInclusive() .receiverQueueSize(recvQueueSize).subscribe(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final CyclicBarrier barrier = new CyclicBarrier(numConsumersThreads + 1); @@ -837,7 +838,6 @@ public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs) Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads); }); consumer.close(); - executor.shutdown(); } @Test @@ -1146,6 +1146,7 @@ public void testAsyncProducerAndConsumer() throws Exception { log.info(" start receiving messages :"); CountDownLatch latch = new CountDownLatch(totalMsg); // receive messages + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(1); receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor); @@ -1160,7 +1161,6 @@ public void testAsyncProducerAndConsumer() throws Exception { producer.close(); consumer.close(); log.info("-- Exiting {} test --", methodName); - executor.shutdown(); } @Test(timeOut = 5000) @@ -1185,6 +1185,7 @@ public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception { log.info(" start receiving messages :"); CountDownLatch latch = new CountDownLatch(totalMsg); // receive messages + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(1); receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor); @@ -1199,7 +1200,6 @@ public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception { producer.close(); consumer.close(); log.info("-- Exiting {} test --", methodName); - executor.shutdown(); } @Test diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java index 2dc34a1bdc7f0..744ed8782555e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java @@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -505,6 +506,7 @@ public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs) .subscriptionName(subName) .startMessageIdInclusive() .receiverQueueSize(recvQueueSize).subscribe(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final CyclicBarrier barrier = new CyclicBarrier(numConsumersThreads + 1); @@ -597,8 +599,6 @@ public Void call() throws Exception { Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads)); Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads); consumer.close(); - - executor.shutdownNow(); } @Test @@ -760,6 +760,7 @@ public void testAsyncProducerAndConsumer() throws Exception { log.info(" start receiving messages :"); CountDownLatch latch = new CountDownLatch(totalMsg); // receive messages + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(1); receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor); @@ -773,7 +774,6 @@ public void testAsyncProducerAndConsumer() throws Exception { producer.close(); consumer.close(); - executor.shutdownNow(); log.info("-- Exiting {} test --", methodName); } @@ -804,6 +804,7 @@ public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception { log.info(" start receiving messages :"); CountDownLatch latch = new CountDownLatch(totalMsg); // receive messages + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(1); receiveAsync(consumer, totalMsg, 0, latch, consumeMsgs, executor); @@ -817,7 +818,6 @@ public void testAsyncProducerAndConsumerWithZeroQueueSize() throws Exception { producer.close(); consumer.close(); - executor.shutdownNow(); log.info("-- Exiting {} test --", methodName); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index 14fce51eb6214..9b1fc4b693af6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -535,6 +535,7 @@ public void testCloseConnectionOnBrokerRejectedRequest() throws Exception { final String topicName = "persistent://prop/usw/my-ns/newTopic"; final int maxConccurentLookupRequest = pulsar.getConfiguration().getMaxConcurrentLookupRequest(); final int concurrentLookupRequests = 20; + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(concurrentLookupRequests); try { stopBroker(); @@ -582,7 +583,6 @@ public void testCloseConnectionOnBrokerRejectedRequest() throws Exception { assertEquals(failed.get(), 1); } finally { conf.setMaxConcurrentLookupRequest(maxConccurentLookupRequest); - executor.shutdownNow(); } } @@ -607,6 +607,7 @@ public void testMaxConcurrentTopicLoading() throws Exception { final String topicName = "persistent://prop/usw/my-ns/cocurrentLoadingTopic"; int concurrentTopic = pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest(); final int concurrentLookupRequests = 20; + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(concurrentLookupRequests); try { @@ -652,7 +653,6 @@ public void testMaxConcurrentTopicLoading() throws Exception { } finally { // revert back to original value pulsar.getConfiguration().setMaxConcurrentTopicLoadRequest(concurrentTopic); - executor.shutdownNow(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java index aa977b939b4a4..77d987bb662d0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageChunkingTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -259,48 +260,45 @@ public void testMaxPendingChunkMessages() throws Exception { final int totalMessages = 25; final String topicName = "persistent://my-property/my-ns/maxPending"; final int totalProducers = 25; + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(totalProducers); - try { - ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer().topic(topicName) - .subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0, TimeUnit.SECONDS) - .maxPendingChuckedMessage(1).autoAckOldestChunkedMessageOnQueueFull(true) - .ackTimeout(5, TimeUnit.SECONDS).subscribe(); - - ProducerBuilder producerBuilder = pulsarClient.newProducer().topic(topicName); - - Producer[] producers = new Producer[totalProducers]; - int totalPublishedMessages = totalProducers; - List> futures = Lists.newArrayList(); - for (int i = 0; i < totalProducers; i++) { - producers[i] = producerBuilder.enableChunking(true).enableBatching(false).create(); - int index = i; - executor.submit(() -> { - futures.add(producers[index].sendAsync(createMessagePayload(45).getBytes())); - }); - } + ConsumerImpl consumer = (ConsumerImpl) pulsarClient.newConsumer().topic(topicName) + .subscriptionName("my-subscriber-name").acknowledgmentGroupTime(0, TimeUnit.SECONDS) + .maxPendingChuckedMessage(1).autoAckOldestChunkedMessageOnQueueFull(true) + .ackTimeout(5, TimeUnit.SECONDS).subscribe(); - FutureUtil.waitForAll(futures).get(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get(); - - Message msg = null; - Set messageSet = Sets.newHashSet(); - for (int i = 0; i < totalMessages; i++) { - msg = consumer.receive(1, TimeUnit.SECONDS); - if (msg == null) { - break; - } - String receivedMessage = new String(msg.getData()); - log.info("Received message: [{}]", receivedMessage); - messageSet.add(receivedMessage); - consumer.acknowledge(msg); - } + ProducerBuilder producerBuilder = pulsarClient.newProducer().topic(topicName); + + Producer[] producers = new Producer[totalProducers]; + int totalPublishedMessages = totalProducers; + List> futures = Lists.newArrayList(); + for (int i = 0; i < totalProducers; i++) { + producers[i] = producerBuilder.enableChunking(true).enableBatching(false).create(); + int index = i; + executor.submit(() -> { + futures.add(producers[index].sendAsync(createMessagePayload(45).getBytes())); + }); + } - assertNotEquals(messageSet.size(), totalPublishedMessages); - } finally { - executor.shutdown(); + FutureUtil.waitForAll(futures).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topicName).get().get(); + + Message msg = null; + Set messageSet = Sets.newHashSet(); + for (int i = 0; i < totalMessages; i++) { + msg = consumer.receive(1, TimeUnit.SECONDS); + if (msg == null) { + break; + } + String receivedMessage = new String(msg.getData()); + log.info("Received message: [{}]", receivedMessage); + messageSet.add(receivedMessage); + consumer.acknowledge(msg); } + assertNotEquals(messageSet.size(), totalPublishedMessages); + } /** diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java index 281445cc03c52..342dddde119fd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessagePublishThrottlingTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; import org.apache.pulsar.broker.service.Producer; import org.apache.pulsar.broker.service.PublishRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -452,6 +453,7 @@ public void testBrokerTopicPublishByteThrottling() throws Exception { } List> topicRatesCounter = Lists.newArrayListWithExpectedSize(3); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newSingleThreadExecutor(); final AtomicDouble topicsRateIn = new AtomicDouble(0); final AtomicInteger index = new AtomicInteger(0); @@ -529,6 +531,5 @@ public void testBrokerTopicPublishByteThrottling() throws Exception { assertTrue(rateIn > numMessage * msgBytes); producer.close(); - executor.shutdown(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java index 6840db9318c4c..8b430e7b2fa8f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/MessageRedeliveryTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import lombok.Cleanup; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -77,7 +78,7 @@ public static Object[][] useOpenRangeSet() { /** * It tests that ManagedCursor tracks individually deleted messages and markDeletePosition correctly with different * range-set implementation and re-delivers messages as expected. - * + * * @param useOpenRangeSet * @throws Exception */ @@ -87,138 +88,136 @@ public void testRedelivery(boolean useOpenRangeSet) throws Exception { this.conf.setManagedLedgerMaxEntriesPerLedger(5); this.conf.setManagedLedgerMinLedgerRolloverTimeMinutes(0); this.conf.setManagedLedgerUnackedRangesOpenCacheSetEnabled(useOpenRangeSet); + @Cleanup("shutdownNow") final ScheduledExecutorService executor = Executors.newScheduledThreadPool(20, new DefaultThreadFactory("pulsar")); - try { - final String ns1 = "my-property/brok-ns1"; - final String subName = "my-subscriber-name"; - final int numMessages = 50; - admin.namespaces().createNamespace(ns1, Sets.newHashSet("test")); - - final String topic1 = "persistent://" + ns1 + "/my-topic"; - - ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.newConsumer().topic(topic1) - .subscriptionName(subName).subscriptionType(SubscriptionType.Shared).receiverQueueSize(10) - .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS).subscribe(); - ConsumerImpl consumer2 = (ConsumerImpl) pulsarClient.newConsumer().topic(topic1) - .subscriptionName(subName).subscriptionType(SubscriptionType.Shared).receiverQueueSize(10) - .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS).subscribe(); - ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer().topic(topic1).create(); - - for (int i = 0; i < numMessages; i++) { - String message = "my-message-" + i; - producer.send(message.getBytes()); - } + final String ns1 = "my-property/brok-ns1"; + final String subName = "my-subscriber-name"; + final int numMessages = 50; + admin.namespaces().createNamespace(ns1, Sets.newHashSet("test")); + + final String topic1 = "persistent://" + ns1 + "/my-topic"; + + ConsumerImpl consumer1 = (ConsumerImpl) pulsarClient.newConsumer().topic(topic1) + .subscriptionName(subName).subscriptionType(SubscriptionType.Shared).receiverQueueSize(10) + .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS).subscribe(); + ConsumerImpl consumer2 = (ConsumerImpl) pulsarClient.newConsumer().topic(topic1) + .subscriptionName(subName).subscriptionType(SubscriptionType.Shared).receiverQueueSize(10) + .acknowledgmentGroupTime(0, TimeUnit.MILLISECONDS).subscribe(); + ProducerImpl producer = (ProducerImpl) pulsarClient.newProducer().topic(topic1).create(); + + for (int i = 0; i < numMessages; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } - CountDownLatch latch = new CountDownLatch(numMessages); - AtomicBoolean consume1 = new AtomicBoolean(true); - AtomicBoolean consume2 = new AtomicBoolean(true); - Set ackedMessages = Sets.newConcurrentHashSet(); - AtomicInteger counter = new AtomicInteger(0); - - // (1) ack alternate message from consumer-1 which creates ack-hole. - executor.submit(() -> { - while (true) { - try { - Message msg = consumer1.receive(1000, TimeUnit.MILLISECONDS); - if (msg != null) { - if (counter.getAndIncrement() % 2 == 0) { - try { - consumer1.acknowledge(msg); - // ack alternate messages - ackedMessages.add(new String(msg.getData())); - } catch (PulsarClientException e1) { - log.warn("Failed to ack message {}", e1.getMessage()); - } + CountDownLatch latch = new CountDownLatch(numMessages); + AtomicBoolean consume1 = new AtomicBoolean(true); + AtomicBoolean consume2 = new AtomicBoolean(true); + Set ackedMessages = Sets.newConcurrentHashSet(); + AtomicInteger counter = new AtomicInteger(0); + + // (1) ack alternate message from consumer-1 which creates ack-hole. + executor.submit(() -> { + while (true) { + try { + Message msg = consumer1.receive(1000, TimeUnit.MILLISECONDS); + if (msg != null) { + if (counter.getAndIncrement() % 2 == 0) { + try { + consumer1.acknowledge(msg); + // ack alternate messages + ackedMessages.add(new String(msg.getData())); + } catch (PulsarClientException e1) { + log.warn("Failed to ack message {}", e1.getMessage()); } - } else { - break; } - } catch (PulsarClientException e2) { - // Ok + } else { break; } - latch.countDown(); + } catch (PulsarClientException e2) { + // Ok + break; } - }); - - // (2) ack all the consumed messages from consumer-2 - executor.submit(() -> { - while (consume2.get()) { - try { - Message msg = consumer2.receive(1000, TimeUnit.MILLISECONDS); - if (msg != null) { - consumer2.acknowledge(msg); - // ack alternate messages - ackedMessages.add(new String(msg.getData())); - } else { - break; - } - } catch (PulsarClientException e2) { - // Ok + latch.countDown(); + } + }); + + // (2) ack all the consumed messages from consumer-2 + executor.submit(() -> { + while (consume2.get()) { + try { + Message msg = consumer2.receive(1000, TimeUnit.MILLISECONDS); + if (msg != null) { + consumer2.acknowledge(msg); + // ack alternate messages + ackedMessages.add(new String(msg.getData())); + } else { break; } - latch.countDown(); + } catch (PulsarClientException e2) { + // Ok + break; } - }); - - latch.await(10000, TimeUnit.MILLISECONDS); - consume1.set(false); - - // (3) sleep so, consumer2 should timeout on it's pending read operation and not consume more messages - Thread.sleep(1000); - - // (4) here we consume all messages but consumer1 only acked alternate messages. - assertNotEquals(ackedMessages.size(), numMessages); - - PersistentTopic pTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicIfExists(topic1).get() - .get(); - ManagedLedgerImpl ml = (ManagedLedgerImpl) pTopic.getManagedLedger(); - ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); - - // (5) now, close consumer1 and let broker deliver consumer1's unack messages to consumer2 - consumer1.close(); - - // (6) broker should redeliver all unack messages of consumer-1 and consumer-2 should ack all of them - CountDownLatch latch2 = new CountDownLatch(1); - executor.submit(() -> { - while (true) { - try { - Message msg = consumer2.receive(1000, TimeUnit.MILLISECONDS); - if (msg != null) { - consumer2.acknowledge(msg); - // ack alternate messages - ackedMessages.add(new String(msg.getData())); - } else { - break; - } - } catch (PulsarClientException e2) { - // Ok + latch.countDown(); + } + }); + + latch.await(10000, TimeUnit.MILLISECONDS); + consume1.set(false); + + // (3) sleep so, consumer2 should timeout on it's pending read operation and not consume more messages + Thread.sleep(1000); + + // (4) here we consume all messages but consumer1 only acked alternate messages. + assertNotEquals(ackedMessages.size(), numMessages); + + PersistentTopic pTopic = (PersistentTopic) this.pulsar.getBrokerService().getTopicIfExists(topic1).get() + .get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) pTopic.getManagedLedger(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); + + // (5) now, close consumer1 and let broker deliver consumer1's unack messages to consumer2 + consumer1.close(); + + // (6) broker should redeliver all unack messages of consumer-1 and consumer-2 should ack all of them + CountDownLatch latch2 = new CountDownLatch(1); + executor.submit(() -> { + while (true) { + try { + Message msg = consumer2.receive(1000, TimeUnit.MILLISECONDS); + if (msg != null) { + consumer2.acknowledge(msg); + // ack alternate messages + ackedMessages.add(new String(msg.getData())); + } else { break; } - if (ackedMessages.size() == numMessages) - latch2.countDown(); + } catch (PulsarClientException e2) { + // Ok + break; } + if (ackedMessages.size() == numMessages) + latch2.countDown(); + } - }); + }); - latch2.await(20000, TimeUnit.MILLISECONDS); + latch2.await(20000, TimeUnit.MILLISECONDS); - consumer2.close(); + consumer2.close(); - assertEquals(ackedMessages.size(), numMessages); + assertEquals(ackedMessages.size(), numMessages); - // (7) acked message set should be empty - assertEquals(cursor.getIndividuallyDeletedMessagesSet().size(), 0); + // (7) acked message set should be empty + assertEquals(cursor.getIndividuallyDeletedMessagesSet().size(), 0); - // markDelete position should be one position behind read position - assertEquals(cursor.getReadPosition(), cursor.getMarkDeletedPosition().getNext()); + // markDelete position should be one position behind read position + assertEquals(cursor.getReadPosition(), cursor.getMarkDeletedPosition().getNext()); + + producer.close(); + consumer2.close(); - producer.close(); - consumer2.close(); - } finally { - executor.shutdown(); - } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 15abac6c5f023..0c038d973a2ee 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.netty.util.Timeout; +import lombok.Cleanup; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.client.admin.PulsarAdminException; @@ -283,6 +284,7 @@ public void testAsyncConsumer() throws Exception { log.info("start async consume"); CountDownLatch latch = new CountDownLatch(totalMessages); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newFixedThreadPool(1); executor.execute(() -> IntStream.range(0, totalMessages).forEach(index -> consumer.receiveAsync() @@ -310,7 +312,6 @@ public void testAsyncConsumer() throws Exception { producer1.close(); producer2.close(); producer3.close(); - executor.shutdownNow(); } @Test(timeOut = testTimeout) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index e56d4f4bceff0..ea580eb52ca6a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -40,6 +40,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -69,7 +70,7 @@ */ @Test(groups = "broker-io") public class PulsarFunctionE2ETest extends AbstractPulsarE2ETest { - + protected static FunctionConfig createFunctionConfig(String tenant, String namespace, String functionName, String sourceTopic, String sinkTopic, String subscriptionName) { String sourceTopicPattern = String.format("persistent://%s/%s/%s", tenant, namespace, sourceTopic); @@ -209,6 +210,7 @@ public void testReadCompactedFunction() throws Exception { expected.put(key, value); } // 3 Trigger compaction + @Cleanup("shutdownNow") ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build()); TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(config, @@ -243,7 +245,6 @@ public void testReadCompactedFunction() throws Exception { Assert.assertEquals(count, maxKeys); Assert.assertTrue(expected.isEmpty()); - compactionScheduler.shutdownNow(); consumer.close(); producer.close(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 02994640fccc4..0d044fa0e33b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -90,6 +90,7 @@ public void testReadCompactedSink() throws Exception { expected.put(key, value); } // 3 Trigger compaction + @Cleanup("shutdownNow") ScheduledExecutorService compactionScheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("compactor").setDaemon(true).build()); TwoPhaseCompactor twoPhaseCompactor = new TwoPhaseCompactor(config, @@ -117,7 +118,6 @@ public void testReadCompactedSink() throws Exception { } }, 50, 1000); - compactionScheduler.shutdownNow(); producer.close(); } @@ -448,5 +448,5 @@ private static SinkConfig createSinkConfig(String tenant, String namespace, Stri sinkConfig.setCleanupSubscription(true); return sinkConfig; } - + } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java index 300fdbd593589..5741a5eb0e648 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyAuthenticationTest.java @@ -39,6 +39,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import lombok.Cleanup; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.websocket.WebSocketService; @@ -94,6 +95,7 @@ public void setup() throws Exception { @AfterMethod(alwaysRun = true) public void cleanup() throws Exception { + @Cleanup("shutdownNow") ExecutorService executor = newFixedThreadPool(1); try { executor.submit(() -> { @@ -108,7 +110,6 @@ public void cleanup() throws Exception { } catch (Exception e) { log.error("failed to close clients ", e); } - executor.shutdownNow(); super.internalCleanup(); if (service != null) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index fac22bfa5e3e4..bbe22812a08e1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -50,6 +50,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerAccessMode; @@ -825,6 +826,7 @@ private void verifyProxyStats(Client client, String baseUrl, String topic) { } private void stopWebSocketClient(WebSocketClient... clients) { + @Cleanup("shutdownNow") ExecutorService executor = newFixedThreadPool(1); try { executor.submit(() -> { @@ -840,7 +842,6 @@ private void stopWebSocketClient(WebSocketClient... clients) { } catch (Exception e) { log.error("failed to close proxy clients", e); } - executor.shutdownNow(); } private static final Logger log = LoggerFactory.getLogger(ProxyPublishConsumeTest.class); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java index 9ba131c67c5b7..4c82615aa7fc9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTlsTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.apache.pulsar.client.api.TlsProducerConsumerBase; import org.apache.pulsar.client.impl.auth.AuthenticationTls; import org.apache.pulsar.common.util.SecurityUtility; @@ -133,6 +134,7 @@ public void socketTest() throws GeneralSecurityException { log.error(t.getMessage()); Assert.fail(t.getMessage()); } finally { + @Cleanup("shutdownNow") ExecutorService executor = newFixedThreadPool(1); try { executor.submit(() -> { @@ -147,7 +149,6 @@ public void socketTest() throws GeneralSecurityException { } catch (Exception e) { log.error("failed to close clients ", e); } - executor.shutdownNow(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java index 2f6a623ca82bc..485f23bdeb165 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeWithoutZKTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.websocket.WebSocketService; @@ -114,6 +115,7 @@ public void socketTest() throws Exception { Assert.assertTrue(produceSocket.getBuffer().size() > 0); Assert.assertEquals(produceSocket.getBuffer(), consumeSocket.getBuffer()); } finally { + @Cleanup("shutdownNow") ExecutorService executor = newFixedThreadPool(1); try { executor.submit(() -> { @@ -128,7 +130,6 @@ public void socketTest() throws Exception { } catch (Exception e) { log.error("failed to close clients ", e); } - executor.shutdownNow(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java index 0ccb4b557723e..d315a10c46a28 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/v1/V1_ProxyAuthenticationTest.java @@ -39,6 +39,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import lombok.Cleanup; import org.apache.pulsar.client.api.v1.V1_ProducerConsumerBase; import org.apache.pulsar.metadata.impl.ZKMetadataStore; import org.apache.pulsar.websocket.WebSocketService; @@ -96,6 +97,7 @@ public void setup() throws Exception { @AfterMethod(alwaysRun = true) public void cleanup() throws Exception { + @Cleanup("shutdownNow") ExecutorService executor = newFixedThreadPool(1); try { executor.submit(() -> { @@ -110,7 +112,6 @@ public void cleanup() throws Exception { } catch (Exception e) { log.error("failed to close clients ", e); } - executor.shutdownNow(); super.internalCleanup(); if (service != null) { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/utils/IOUtilsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/utils/IOUtilsTest.java index 11901c685456b..007c1c883b28d 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/utils/IOUtilsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/utils/IOUtilsTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -102,6 +103,7 @@ public void test5() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); System.setOut(new PrintStream(baos)); System.setIn(new ByteArrayInputStream(data.getBytes())); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newSingleThreadExecutor(); @SuppressWarnings("unchecked") Future future = (Future) executor.submit(() -> { @@ -128,6 +130,7 @@ public void test6() { ByteArrayOutputStream baos = new ByteArrayOutputStream(); System.setOut(new PrintStream(baos)); System.setIn(new ByteArrayInputStream(data.getBytes(StandardCharsets.UTF_8))); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newSingleThreadExecutor(); @SuppressWarnings("unchecked") Future future = (Future) executor.submit(() -> { diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java index 8e72103bafb17..8d82794d2863e 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java @@ -29,6 +29,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import lombok.Cleanup; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -68,6 +69,7 @@ public void testInitialization() throws InterruptedException, ExecutionException int numberOfMessages = 10; + @Cleanup("shutdownNow") ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture future = new CompletableFuture(); @@ -103,7 +105,6 @@ public void testInitialization() throws InterruptedException, ExecutionException Assert.assertEquals(pulsarClientToolProducer.run(args), 0); future.get(); - executor.shutdown(); } @Test(timeOut = 20000) @@ -116,6 +117,7 @@ public void testNonDurableSubscribe() throws Exception { final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString(); int numberOfMessages = 10; + @Cleanup("shutdownNow") ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture future = new CompletableFuture<>(); executor.execute(() -> { @@ -147,7 +149,6 @@ public void testNonDurableSubscribe() throws Exception { Assert.assertEquals(pulsarClientToolProducer.run(args), 0); Assert.assertFalse(future.isCompletedExceptionally()); future.get(); - executor.shutdown(); while (true) { try { @@ -171,6 +172,7 @@ public void testDurableSubscribe() throws Exception { final String topicName = "persistent://prop/ns-abc/test/topic-" + UUID.randomUUID().toString(); int numberOfMessages = 10; + @Cleanup("shutdownNow") ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture future = new CompletableFuture<>(); executor.execute(() -> { @@ -204,7 +206,6 @@ public void testDurableSubscribe() throws Exception { Assert.assertEquals(pulsarClientToolProducer.run(args), 0); Assert.assertFalse(future.isCompletedExceptionally()); future.get(); - executor.shutdown(); //wait for close Thread.sleep(2000); List subscriptions = admin.topics().getSubscriptions(topicName); @@ -222,6 +223,7 @@ public void testEncryption() throws Exception { final String keyUriBase = "file:../pulsar-broker/src/test/resources/certificate/"; final int numberOfMessages = 10; + @Cleanup("shutdownNow") ExecutorService executor = Executors.newSingleThreadExecutor(); CompletableFuture future = new CompletableFuture<>(); executor.execute(() -> { @@ -257,8 +259,6 @@ public void testEncryption() throws Exception { Assert.assertFalse(future.isCompletedExceptionally()); } catch (Exception e) { Assert.fail("consumer was unable to decrypt messages", e); - } finally { - executor.shutdown(); } } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java index f73aa0f3642bd..c5e65ba0c30f3 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java @@ -19,6 +19,7 @@ package org.apache.pulsar.client.cli; import java.time.Duration; +import lombok.Cleanup; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.websocket.WebSocketService; import org.apache.pulsar.websocket.service.ProxyServer; @@ -61,35 +62,37 @@ public void testWebSocketNonDurableSubscriptionMode() throws Exception { final String topicName = "persistent://my-property/my-ns/test/topic-" + UUID.randomUUID(); int numberOfMessages = 10; - ExecutorService executor = Executors.newSingleThreadExecutor(); - CompletableFuture future = new CompletableFuture<>(); - executor.execute(() -> { - try { - PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties); - String[] args = {"consume", "-t", "Exclusive", "-s", "sub-name", "-n", - Integer.toString(numberOfMessages), "--hex", "-m", "NonDurable", "-r", "30", topicName}; - Assert.assertEquals(pulsarClientToolConsumer.run(args), 0); - future.complete(null); - } catch (Throwable t) { - future.completeExceptionally(t); - } - }); - - // Make sure subscription has been created - Awaitility.await() - .pollInterval(Duration.ofMillis(200)) - .ignoreExceptions().untilAsserted(() -> { - Assert.assertEquals(admin.topics().getSubscriptions(topicName).size(), 1); - }); - - PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties); - - String[] args = {"produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r", - "20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName}; - Assert.assertEquals(pulsarClientToolProducer.run(args), 0); - future.get(); - Assert.assertFalse(future.isCompletedExceptionally()); - executor.shutdown(); + { + @Cleanup("shutdown") + ExecutorService executor = Executors.newSingleThreadExecutor(); + CompletableFuture future = new CompletableFuture<>(); + executor.execute(() -> { + try { + PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties); + String[] args = {"consume", "-t", "Exclusive", "-s", "sub-name", "-n", + Integer.toString(numberOfMessages), "--hex", "-m", "NonDurable", "-r", "30", topicName}; + Assert.assertEquals(pulsarClientToolConsumer.run(args), 0); + future.complete(null); + } catch (Throwable t) { + future.completeExceptionally(t); + } + }); + + // Make sure subscription has been created + Awaitility.await() + .pollInterval(Duration.ofMillis(200)) + .ignoreExceptions().untilAsserted(() -> { + Assert.assertEquals(admin.topics().getSubscriptions(topicName).size(), 1); + }); + + PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties); + + String[] args = {"produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r", + "20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName}; + Assert.assertEquals(pulsarClientToolProducer.run(args), 0); + future.get(); + Assert.assertFalse(future.isCompletedExceptionally()); + } Awaitility.await() .ignoreExceptions().untilAsserted(() -> { @@ -106,35 +109,37 @@ public void testWebSocketDurableSubscriptionMode() throws Exception { final String topicName = "persistent://my-property/my-ns/test/topic-" + UUID.randomUUID(); int numberOfMessages = 10; - ExecutorService executor = Executors.newSingleThreadExecutor(); - CompletableFuture future = new CompletableFuture<>(); - executor.execute(() -> { - try { - PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties); - String[] args = {"consume", "-t", "Exclusive", "-s", "sub-name", "-n", - Integer.toString(numberOfMessages), "--hex", "-m", "Durable", "-r", "30", topicName}; - Assert.assertEquals(pulsarClientToolConsumer.run(args), 0); - future.complete(null); - } catch (Throwable t) { - future.completeExceptionally(t); - } - }); - - // Make sure subscription has been created - Awaitility.await() - .pollInterval(Duration.ofMillis(200)) - .ignoreExceptions().untilAsserted(() -> { - Assert.assertEquals(admin.topics().getSubscriptions(topicName).size(), 1); - }); - - PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties); - - String[] args = {"produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r", - "20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName}; - Assert.assertEquals(pulsarClientToolProducer.run(args), 0); - future.get(); - Assert.assertFalse(future.isCompletedExceptionally()); - executor.shutdown(); + { + @Cleanup("shutdown") + ExecutorService executor = Executors.newSingleThreadExecutor(); + CompletableFuture future = new CompletableFuture<>(); + executor.execute(() -> { + try { + PulsarClientTool pulsarClientToolConsumer = new PulsarClientTool(properties); + String[] args = {"consume", "-t", "Exclusive", "-s", "sub-name", "-n", + Integer.toString(numberOfMessages), "--hex", "-m", "Durable", "-r", "30", topicName}; + Assert.assertEquals(pulsarClientToolConsumer.run(args), 0); + future.complete(null); + } catch (Throwable t) { + future.completeExceptionally(t); + } + }); + + // Make sure subscription has been created + Awaitility.await() + .pollInterval(Duration.ofMillis(200)) + .ignoreExceptions().untilAsserted(() -> { + Assert.assertEquals(admin.topics().getSubscriptions(topicName).size(), 1); + }); + + PulsarClientTool pulsarClientToolProducer = new PulsarClientTool(properties); + + String[] args = {"produce", "--messages", "Have a nice day", "-n", Integer.toString(numberOfMessages), "-r", + "20", "-p", "key1=value1", "-p", "key2=value2", "-k", "partition_key", topicName}; + Assert.assertEquals(pulsarClientToolProducer.run(args), 0); + future.get(); + Assert.assertFalse(future.isCompletedExceptionally()); + } //wait for close Thread.sleep(2000); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java index 8378aa5a81d48..17ec7c8b842a1 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/FutureUtilTest.java @@ -30,6 +30,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; +import lombok.Cleanup; import org.testng.annotations.Test; public class FutureUtilTest { @@ -50,6 +51,7 @@ public void testCreateTimeoutException() { @Test public void testTimeoutHandling() { CompletableFuture future = new CompletableFuture<>(); + @Cleanup("shutdownNow") ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Exception e = new Exception(); try { @@ -60,26 +62,22 @@ public void testTimeoutHandling() { fail("Shouldn't occur"); } catch (ExecutionException executionException) { assertEquals(executionException.getCause(), e); - } finally { - executor.shutdownNow(); } } @Test public void testTimeoutHandlingNoTimeout() throws ExecutionException, InterruptedException { CompletableFuture future = new CompletableFuture<>(); + @Cleanup("shutdownNow") ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - try { - FutureUtil.addTimeoutHandling(future, Duration.ofMillis(100), executor, () -> new Exception()); - future.complete(null); - future.get(); - } finally { - executor.shutdownNow(); - } + FutureUtil.addTimeoutHandling(future, Duration.ofMillis(100), executor, () -> new Exception()); + future.complete(null); + future.get(); } @Test public void testCreatingFutureWithTimeoutHandling() { + @Cleanup("shutdownNow") ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Exception e = new Exception(); try { @@ -91,8 +89,6 @@ public void testCreatingFutureWithTimeoutHandling() { fail("Shouldn't occur"); } catch (ExecutionException executionException) { assertEquals(executionException.getCause(), e); - } finally { - executor.shutdownNow(); } } } \ No newline at end of file diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java index cac262607d844..bb743cbbb2990 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongHashMapTest.java @@ -40,6 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.LongFunction; +import lombok.Cleanup; import org.testng.annotations.Test; public class ConcurrentLongHashMapTest { @@ -167,6 +168,7 @@ public void testRehashingWithDeletes() { @Test public void concurrentInsertions() throws Throwable { ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int nThreads = 16; @@ -195,13 +197,12 @@ public void concurrentInsertions() throws Throwable { } assertEquals(map.size(), N * nThreads); - - executor.shutdown(); } @Test public void concurrentInsertionsAndReads() throws Throwable { ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int nThreads = 16; @@ -230,13 +231,12 @@ public void concurrentInsertionsAndReads() throws Throwable { } assertEquals(map.size(), N * nThreads); - - executor.shutdown(); } @Test public void stressConcurrentInsertionsAndReads() throws Throwable { ConcurrentLongHashMap map = new ConcurrentLongHashMap<>(4, 1); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int writeThreads = 16; final int readThreads = 16; @@ -282,7 +282,6 @@ public void stressConcurrentInsertionsAndReads() throws Throwable { future.get(); } assertEquals(map.size(), n * writeThreads); - executor.shutdown(); } @Test diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java index c51616aa857b9..82cac712975ed 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentLongPairSetTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import lombok.Cleanup; import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPair; import org.testng.annotations.Test; @@ -177,6 +178,7 @@ public void testRehashingWithDeletes() { @Test public void concurrentInsertions() throws Throwable { ConcurrentLongPairSet set = new ConcurrentLongPairSet(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int nThreads = 16; @@ -204,13 +206,12 @@ public void concurrentInsertions() throws Throwable { } assertEquals(set.size(), N * nThreads); - - executor.shutdown(); } @Test public void concurrentInsertionsAndReads() throws Throwable { ConcurrentLongPairSet map = new ConcurrentLongPairSet(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int nThreads = 16; @@ -238,8 +239,6 @@ public void concurrentInsertionsAndReads() throws Throwable { } assertEquals(map.size(), N * nThreads); - - executor.shutdown(); } @Test @@ -394,7 +393,7 @@ public void testEqualsObjects() { assertFalse(set.contains(t1, t1)); assertFalse(set.contains(t1_b, t1_b)); } - + @Test public void testToString() { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java index c7641a8b6cf16..58f6ee5e85c6b 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashMapTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import lombok.Cleanup; import org.testng.annotations.Test; import com.google.common.collect.Lists; @@ -153,6 +154,7 @@ public void testRehashingWithDeletes() { @Test public void concurrentInsertions() throws Throwable { ConcurrentOpenHashMap map = new ConcurrentOpenHashMap<>(16, 1); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int nThreads = 16; @@ -181,13 +183,12 @@ public void concurrentInsertions() throws Throwable { } assertEquals(map.size(), N * nThreads); - - executor.shutdown(); } @Test public void concurrentInsertionsAndReads() throws Throwable { ConcurrentOpenHashMap map = new ConcurrentOpenHashMap<>(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int nThreads = 16; @@ -216,8 +217,6 @@ public void concurrentInsertionsAndReads() throws Throwable { } assertEquals(map.size(), N * nThreads); - - executor.shutdown(); } @Test diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java index eec176bea2968..3c1d99668d733 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentOpenHashSetTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import lombok.Cleanup; import org.testng.annotations.Test; import com.google.common.collect.Lists; @@ -145,6 +146,7 @@ public void testRehashingWithDeletes() { @Test public void concurrentInsertions() throws Throwable { ConcurrentOpenHashSet set = new ConcurrentOpenHashSet<>(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int nThreads = 16; @@ -172,13 +174,12 @@ public void concurrentInsertions() throws Throwable { } assertEquals(set.size(), N * nThreads); - - executor.shutdown(); } @Test public void concurrentInsertionsAndReads() throws Throwable { ConcurrentOpenHashSet map = new ConcurrentOpenHashSet<>(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int nThreads = 16; @@ -206,8 +207,6 @@ public void concurrentInsertionsAndReads() throws Throwable { } assertEquals(map.size(), N * nThreads); - - executor.shutdown(); } @Test diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java index 532d10fa9d665..821bb8819554b 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/ConcurrentSortedLongPairSetTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import lombok.Cleanup; import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet.LongPair; import org.testng.annotations.Test; @@ -83,6 +84,7 @@ public void testRemove() { @Test public void concurrentInsertions() throws Throwable { LongPairSet set = new ConcurrentSortedLongPairSet(16); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int nThreads = 8; @@ -110,8 +112,6 @@ public void concurrentInsertions() throws Throwable { } assertEquals(set.size(), N * nThreads); - - executor.shutdown(); } @Test diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueueTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueueTest.java index 3adfd418f56f5..7a7cb5949b76f 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueueTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/collections/GrowablePriorityLongPairQueueTest.java @@ -34,6 +34,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; +import lombok.Cleanup; import org.apache.pulsar.common.util.collections.GrowablePriorityLongPairQueue.LongPair; import org.testng.annotations.Test; @@ -162,6 +163,7 @@ public void testExpandWithDeletes() { @Test public void concurrentInsertions() throws Throwable { GrowablePriorityLongPairQueue queue = new GrowablePriorityLongPairQueue(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int nThreads = 16; @@ -189,13 +191,12 @@ public void concurrentInsertions() throws Throwable { } assertEquals(queue.size(), N * nThreads); - - executor.shutdown(); } @Test public void concurrentInsertionsAndReads() throws Throwable { GrowablePriorityLongPairQueue map = new GrowablePriorityLongPairQueue(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); final int nThreads = 16; @@ -223,8 +224,6 @@ public void concurrentInsertionsAndReads() throws Throwable { } assertEquals(map.size(), N * nThreads); - - executor.shutdown(); } @Test diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 3fba9e5cddd13..14a215b89e01f 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -20,6 +20,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import lombok.AccessLevel; @@ -46,7 +47,7 @@ public class JavaInstance implements AutoCloseable { // for Async function max out standing items private final InstanceConfig instanceConfig; - private final Executor executor; + private final ExecutorService executor; @Getter private final LinkedBlockingQueue> pendingAsyncRequests; @@ -127,6 +128,7 @@ public CompletableFuture handleMessage(Record record, Ob @Override public void close() { context.close(); + executor.shutdown(); } public Map getAndResetMetrics() { diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index aefc6a9a5b909..59f36d935984e 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.functions.api.Function; @@ -57,6 +58,7 @@ public void testLambda() throws Exception { @Test public void testAsyncFunction() throws Exception { InstanceConfig instanceConfig = new InstanceConfig(); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); Function> function = (input, context) -> { @@ -83,7 +85,6 @@ public void testAsyncFunction() throws Exception { assertNotNull(result.get().getResult()); assertEquals(new String(testString + "-lambda"), result.get().getResult()); instance.close(); - executor.shutdownNow(); } @Test @@ -91,6 +92,7 @@ public void testAsyncFunctionMaxPending() throws Exception { InstanceConfig instanceConfig = new InstanceConfig(); int pendingQueueSize = 3; instanceConfig.setMaxPendingAsyncRequests(pendingQueueSize); + @Cleanup("shutdownNow") ExecutorService executor = Executors.newCachedThreadPool(); Function> function = (input, context) -> { @@ -134,6 +136,5 @@ public void testAsyncFunctionMaxPending() throws Exception { log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime); instance.close(); - executor.shutdownNow(); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 9c17aa05046e6..8627240b90561 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -49,8 +49,6 @@ public class Worker { private ZooKeeperClientFactory zkClientFactory = null; private final OrderedExecutor orderedExecutor = OrderedExecutor.newBuilder().numThreads(8).name("zk-cache-ordered").build(); - private final ScheduledExecutorService cacheExecutor = Executors.newScheduledThreadPool(10, - new DefaultThreadFactory("zk-cache-callback")); private PulsarResources pulsarResources; private MetadataStoreExtended configMetadataStore; private ConfigurationMetadataCacheService configurationCacheService; @@ -68,7 +66,7 @@ protected void start() throws Exception { server = new WorkerServer(workerService, getAuthenticationService()); server.start(); log.info("/** Started worker server on port={} **/", this.workerConfig.getWorkerPort()); - + try { errorNotifier.waitForError(); } catch (Throwable th) { @@ -127,6 +125,10 @@ protected void stop() { log.warn("Failed to close global zk cache ", e); } } + + if (orderedExecutor != null) { + orderedExecutor.shutdownNow(); + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 86e2f40b52bf6..7297b31931f76 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -139,7 +139,7 @@ public void setup() { @AfterMethod(alwaysRun = true) public void stop() { - this.executor.shutdown(); + this.executor.shutdownNow(); } @Test @@ -153,7 +153,7 @@ public void testSchedule() throws Exception { .build(); functionMetaDataList.add(function1); doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData(); - + ThreadRuntimeFactory factory = mock(ThreadRuntimeFactory.class); doReturn(factory).when(functionRuntimeManager).getRuntimeFactory(); diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java index 8a3b5cb51024b..5059b5b68c1a3 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/BookKeeperClusterTestCase.java @@ -118,7 +118,7 @@ public void tearDown() throws Exception { stopBKCluster(); // stop zookeeper service stopZKCluster(); - executor.shutdown(); + executor.shutdownNow(); } /** diff --git a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java index 8ff94dfec079c..431728aa7183e 100644 --- a/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java +++ b/pulsar-package-management/bookkeeper-storage/src/test/java/org/apache/pulsar/packages/management/storage/bookkeeper/bookkeeper/test/MockedBookKeeperTestCase.java @@ -107,8 +107,8 @@ public void setUpClass() { @AfterClass(alwaysRun = true) public void tearDownClass() { - executor.shutdown(); - cachedExecutor.shutdown(); + executor.shutdownNow(); + cachedExecutor.shutdownNow(); } /** diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java index ac9b5e7080056..dbf72d45c5402 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/test/MockedBookKeeperTestCase.java @@ -107,8 +107,8 @@ public void setUpClass() { @AfterClass(alwaysRun = true) public void tearDownClass() { - executor.shutdown(); - cachedExecutor.shutdown(); + executor.shutdownNow(); + cachedExecutor.shutdownNow(); } /** diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImplTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImplTest.java index 4ccc954a7d1e0..14caca56641f9 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImplTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperBkClientFactoryImplTest.java @@ -51,7 +51,7 @@ void setup() throws Exception { @AfterMethod(alwaysRun = true) void teardown() throws Exception { localZkS.close(); - executor.shutdown(); + executor.shutdownNow(); } @Test diff --git a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java index 05227665bdae2..51f2c0a81116a 100644 --- a/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java +++ b/pulsar-zookeeper-utils/src/test/java/org/apache/pulsar/zookeeper/ZookeeperCacheTest.java @@ -48,6 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; +import lombok.Cleanup; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.zookeeper.AsyncCallback.DataCallback; @@ -92,8 +93,8 @@ void classSetup() throws Exception { @AfterClass(alwaysRun = true) void classTeardown() throws Exception { - executor.shutdown(); - scheduledExecutor.shutdown(); + executor.shutdownNow(); + scheduledExecutor.shutdownNow(); } @@ -411,8 +412,11 @@ public String deserialize(String key, byte[] content) throws Exception { */ @Test(timeOut = 2000) public void testZkCallbackThreadStuck() throws Exception { + @Cleanup("shutdownNow") OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build(); + @Cleanup("shutdownNow") ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2); + @Cleanup("shutdownNow") ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk")); // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle // callback-result process @@ -445,9 +449,6 @@ public String deserialize(String key, byte[] content) throws Exception { }); latch.await(); - executor.shutdown(); - zkExecutor.shutdown(); - scheduledExecutor.shutdown(); } /** @@ -461,6 +462,7 @@ public String deserialize(String key, byte[] content) throws Exception { */ @Test(timeOut = 10000) public void testInvalidateCacheOnFailure() throws Exception { + @Cleanup("shutdownNow") ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk")); // add readOpDelayMs so, main thread will not serve zkCacahe-returned future and let zkExecutor-thread handle // callback-result process @@ -515,7 +517,6 @@ public String deserialize(String key, byte[] content) throws Exception { Thread.sleep(1000); // (6) now, cache should be invalidate failed-future and should refetch the data assertEquals(zkCache.getAsync(key1).get().get(), value); - zkExecutor.shutdown(); } /** @@ -526,9 +527,11 @@ public String deserialize(String key, byte[] content) throws Exception { */ @Test public void testTimedOutZKCacheRequestInvalidates() throws Exception { - + @Cleanup("shutdownNow") OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build(); + @Cleanup("shutdownNow") ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2); + @Cleanup("shutdownNow") ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk")); MockZooKeeper zkSession = spy(MockZooKeeper.newInstance(MoreExecutors.newDirectExecutorService())); @@ -558,22 +561,21 @@ public String deserialize(String key, byte[] content) throws Exception { }, 5, 1000); assertNull(zkCacheService.dataCache.getIfPresent(path)); - - executor.shutdown(); - zkExecutor.shutdown(); - scheduledExecutor.shutdown(); } /** * Test to verify {@link ZooKeeperCache} renews cache data after expiry time in background. - * + * * @throws Exception */ @Test public void testZKRefreshExpiredEntry() throws Exception { int cacheExpiryTimeSec = 1; + @Cleanup("shutdownNow") OrderedScheduler executor = OrderedScheduler.newSchedulerBuilder().build(); + @Cleanup("shutdownNow") ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2); + @Cleanup("shutdownNow") ExecutorService zkExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("mockZk")); String path = "/test"; @@ -606,10 +608,6 @@ public String deserialize(String key, byte[] content) throws Exception { }, 5, 1000); assertEquals(zkCache.get(path).get(), val2); - - executor.shutdown(); - zkExecutor.shutdown(); - scheduledExecutor.shutdown(); } static class ZooKeeperCacheTest extends ZooKeeperCache {