Skip to content

Commit 41814a7

Browse files
authored
Fix some ExecutorService leaks and use @cleanup("shutdownNow") for cleanup (#10198)
- use ExecutorService.shutdownNow() instead of ExecutorService.shutdown() in tests
1 parent b0f3ae0 commit 41814a7

File tree

62 files changed

+362
-341
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

62 files changed

+362
-341
lines changed

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060

6161
import io.netty.buffer.ByteBuf;
6262
import io.netty.buffer.ByteBufAllocator;
63+
import lombok.Cleanup;
6364
import org.apache.bookkeeper.client.BKException;
6465
import org.apache.bookkeeper.client.BookKeeper;
6566
import org.apache.bookkeeper.client.BookKeeper.DigestType;
@@ -708,6 +709,7 @@ void testConcurrentResetCursor() throws Exception {
708709
final int Consumers = 5;
709710

710711
List<Future<AtomicBoolean>> futures = Lists.newArrayList();
712+
@Cleanup("shutdownNow")
711713
ExecutorService executor = Executors.newCachedThreadPool();
712714
final CyclicBarrier barrier = new CyclicBarrier(Consumers + 1);
713715

@@ -1727,6 +1729,7 @@ void testReadEntriesOrWaitBlocking() throws Exception {
17271729
final int Consumers = 10;
17281730

17291731
List<Future<Void>> futures = Lists.newArrayList();
1732+
@Cleanup("shutdownNow")
17301733
ExecutorService executor = Executors.newCachedThreadPool();
17311734
final CyclicBarrier barrier = new CyclicBarrier(Consumers + 1);
17321735

@@ -3465,7 +3468,7 @@ public void deleteFailed(ManagedLedgerException exception, Object ctx) {
34653468

34663469
} finally {
34673470
factory2.shutdown();
3468-
}
3471+
}
34693472
});
34703473

34713474
factory1.shutdown();

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import java.util.concurrent.atomic.AtomicInteger;
6464
import java.util.concurrent.atomic.AtomicReference;
6565
import java.util.function.Predicate;
66+
import lombok.Cleanup;
6667
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
6768
import org.apache.bookkeeper.client.BKException;
6869
import org.apache.bookkeeper.client.BookKeeper;
@@ -1271,6 +1272,7 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
12711272
public void testConcurrentAsyncSetProperties() throws Exception {
12721273
final CountDownLatch latch = new CountDownLatch(1000);
12731274
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
1275+
@Cleanup("shutdownNow")
12741276
ExecutorService executor = Executors.newCachedThreadPool();
12751277
for (int i = 0; i < 1000; i++) {
12761278
final int finalI = i;
@@ -1302,7 +1304,6 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
13021304
fail(e.getMessage());
13031305
}
13041306
assertTrue(latch.await(300, TimeUnit.SECONDS));
1305-
executor.shutdown();
13061307
factory.shutdown();
13071308
}
13081309

@@ -1447,7 +1448,8 @@ public void testOpenRaceCondition() throws Exception {
14471448

14481449
final int N = 1000;
14491450
final Position position = ledger.addEntry("entry-0".getBytes());
1450-
Executor executor = Executors.newCachedThreadPool();
1451+
@Cleanup("shutdownNow")
1452+
ExecutorService executor = Executors.newCachedThreadPool();
14511453
final CountDownLatch counter = new CountDownLatch(2);
14521454
executor.execute(() -> {
14531455
try {
@@ -2290,6 +2292,7 @@ public void testLazyRecoverCursor() throws Exception {
22902292

22912293
// Simulating time consuming cursor recovery.
22922294
CompletableFuture<Void> future = bkc.promiseAfter(2);
2295+
@Cleanup("shutdownNow")
22932296
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lazyCursorRecovery"));
22942297
scheduledExecutorService.schedule(() -> {
22952298
future.complete(null);

managed-ledger/src/test/java/org/apache/bookkeeper/test/BookKeeperClusterTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public void tearDown() throws Exception {
126126
stopBKCluster();
127127
// stop zookeeper service
128128
stopZKCluster();
129-
executor.shutdown();
129+
executor.shutdownNow();
130130
}
131131

132132
/**

managed-ledger/src/test/java/org/apache/bookkeeper/test/MockedBookKeeperTestCase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,10 @@ public final void setUpClass() {
119119
@AfterClass(alwaysRun = true)
120120
public final void tearDownClass() {
121121
if (executor != null) {
122-
executor.shutdown();
122+
executor.shutdownNow();
123123
}
124124
if (cachedExecutor != null) {
125-
cachedExecutor.shutdown();
125+
cachedExecutor.shutdownNow();
126126
}
127127
}
128128

pulsar-broker/src/main/java/org/apache/pulsar/broker/MessagingServiceShutdownHook.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.TimeUnit;
2727
import java.util.concurrent.TimeoutException;
2828
import java.util.function.Consumer;
29+
import lombok.Cleanup;
2930
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
3031
import org.apache.zookeeper.ZooKeeper.States;
3132
import org.slf4j.ILoggerFactory;
@@ -52,6 +53,7 @@ public void run() {
5253
+ service.getSafeWebServiceAddress() + ", broker url=" + service.getSafeBrokerServiceUrl());
5354
}
5455

56+
@Cleanup("shutdownNow")
5557
ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("shutdown-thread"));
5658

5759
try {

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,11 @@ public CompletableFuture<Void> closeAsync() {
438438
transactionBufferClient.close();
439439
}
440440

441+
if (transactionExecutor != null) {
442+
transactionExecutor.shutdown();
443+
transactionExecutor = null;
444+
}
445+
441446
if (coordinationService != null) {
442447
coordinationService.close();
443448
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/MockedBookKeeperClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,6 @@ public void close() {
7474
mockedBk.close();
7575
} catch (BKException | InterruptedException ignored) {
7676
}
77-
executor.shutdown();
77+
executor.shutdownNow();
7878
}
7979
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/SLAMonitoringTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ private void createTenant(PulsarAdmin pulsarAdmin)
126126
@AfterClass(alwaysRun = true)
127127
public void shutdown() throws Exception {
128128
log.info("--- Shutting down ---");
129-
executor.shutdown();
129+
executor.shutdownNow();
130130
executor = null;
131131

132132
for (int i = 0; i < BROKER_COUNT; i++) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1346,6 +1346,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
13461346
assertEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
13471347
}
13481348

1349+
@Cleanup("shutdownNow")
13491350
ExecutorService executorService = Executors.newCachedThreadPool();
13501351

13511352
try {
@@ -1403,7 +1404,6 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
14031404
}
14041405

14051406
producer.close();
1406-
executorService.shutdown();
14071407
}
14081408

14091409
@Test

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -999,6 +999,7 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
999999
assertEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
10001000
}
10011001

1002+
@Cleanup("shutdownNow")
10021003
ExecutorService executorService = Executors.newCachedThreadPool();
10031004

10041005

@@ -1083,7 +1084,6 @@ public void testNamespaceSplitBundleConcurrent() throws Exception {
10831084
}
10841085

10851086
producer.close();
1086-
executorService.shutdownNow();
10871087
}
10881088

10891089
@Test

0 commit comments

Comments
 (0)