Skip to content

Commit 3ac1fbc

Browse files
authored
[MNG-8565] Thread leaks in maven-cli (#2090)
One runtime and one testtime thread leak. --- https://issues.apache.org/jira/browse/MNG-8565
1 parent 5b47021 commit 3ac1fbc

File tree

4 files changed

+123
-104
lines changed

4 files changed

+123
-104
lines changed

impl/maven-cli/src/main/java/org/apache/maven/cling/invoker/mvn/MavenInvoker.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,10 +364,12 @@ protected TransferListener determineTransferListener(MavenContext context, boole
364364
if (quiet || noTransferProgress || quietCI) {
365365
delegate = new QuietMavenTransferListener();
366366
} else if (context.interactive && !logFile) {
367-
delegate = new SimplexTransferListener(new ConsoleMavenTransferListener(
367+
SimplexTransferListener simplex = new SimplexTransferListener(new ConsoleMavenTransferListener(
368368
context.invokerRequest.messageBuilderFactory(),
369369
context.terminal.writer(),
370370
context.invokerRequest.options().verbose().orElse(false)));
371+
context.closeables.add(simplex);
372+
delegate = simplex;
371373
} else {
372374
delegate = new Slf4jMavenTransferListener();
373375
}

impl/maven-cli/src/main/java/org/apache/maven/cling/transfer/SimplexTransferListener.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.ArrayBlockingQueue;
2424
import java.util.concurrent.ConcurrentHashMap;
2525
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2627
import java.util.function.Consumer;
2728

2829
import org.eclipse.aether.transfer.AbstractTransferListener;
@@ -42,14 +43,16 @@
4243
*
4344
* @since 4.0.0
4445
*/
45-
public final class SimplexTransferListener extends AbstractTransferListener {
46+
public final class SimplexTransferListener extends AbstractTransferListener implements AutoCloseable {
4647
private static final Logger LOGGER = LoggerFactory.getLogger(SimplexTransferListener.class);
4748
private static final int QUEUE_SIZE = 1024;
4849
private static final int BATCH_MAX_SIZE = 500;
4950
private final TransferListener delegate;
5051
private final int batchMaxSize;
5152
private final boolean blockOnLastEvent;
5253
private final ArrayBlockingQueue<Exchange> eventQueue;
54+
private final AtomicBoolean closed;
55+
private final Thread updater;
5356

5457
/**
5558
* Constructor that makes passed in delegate run on single thread, and will block on last event.
@@ -76,11 +79,19 @@ public SimplexTransferListener(
7679
this.blockOnLastEvent = blockOnLastEvent;
7780

7881
this.eventQueue = new ArrayBlockingQueue<>(queueSize);
79-
Thread updater = new Thread(this::feedConsumer);
82+
this.closed = new AtomicBoolean(false);
83+
this.updater = new Thread(this::feedConsumer, "simplex-transfer-listener");
8084
updater.setDaemon(true);
8185
updater.start();
8286
}
8387

88+
@Override
89+
public void close() {
90+
if (closed.compareAndSet(false, true)) {
91+
updater.interrupt();
92+
}
93+
}
94+
8495
public TransferListener getDelegate() {
8596
return delegate;
8697
}
@@ -95,8 +106,8 @@ private void feedConsumer() {
95106
}
96107
demux(batch);
97108
}
98-
} catch (InterruptedException e) {
99-
throw new RuntimeException(e);
109+
} catch (InterruptedException ignored) {
110+
// silent
100111
}
101112
}
102113

impl/maven-cli/src/test/java/org/apache/maven/cling/transfer/ConsoleMavenTransferListenerTest.java

Lines changed: 75 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -45,79 +45,85 @@ class ConsoleMavenTransferListenerTest {
4545
void testTransferProgressedWithPrintResourceNames() throws Exception {
4646
int size = 1000;
4747
ExecutorService service = Executors.newFixedThreadPool(size * 2);
48-
startLatch = new CountDownLatch(size);
49-
endLatch = new CountDownLatch(size);
50-
Map<String, String> output = new ConcurrentHashMap<String, String>();
51-
52-
TransferListener listener = new SimplexTransferListener(new ConsoleMavenTransferListener(
53-
new JLineMessageBuilderFactory(),
54-
new PrintWriter(System.out) {
55-
56-
@Override
57-
public void print(Object o) {
58-
59-
String string = o.toString();
60-
int i = string.length() - 1;
61-
while (i >= 0) {
62-
char c = string.charAt(i);
63-
if (c == '\n' || c == '\r' || c == ' ') i--;
64-
else break;
48+
try {
49+
startLatch = new CountDownLatch(size);
50+
endLatch = new CountDownLatch(size);
51+
Map<String, String> output = new ConcurrentHashMap<String, String>();
52+
53+
try (SimplexTransferListener listener = new SimplexTransferListener(new ConsoleMavenTransferListener(
54+
new JLineMessageBuilderFactory(),
55+
new PrintWriter(System.out) {
56+
57+
@Override
58+
public void print(Object o) {
59+
60+
String string = o.toString();
61+
int i = string.length() - 1;
62+
while (i >= 0) {
63+
char c = string.charAt(i);
64+
if (c == '\n' || c == '\r' || c == ' ') i--;
65+
else break;
66+
}
67+
68+
string = string.substring(0, i + 1).trim();
69+
output.put(string, string);
70+
System.out.print(o);
6571
}
66-
67-
string = string.substring(0, i + 1).trim();
68-
output.put(string, string);
69-
System.out.print(o);
72+
},
73+
true))) {
74+
TransferResource resource =
75+
new TransferResource(null, null, "http://maven.org/test/test-resource", new File(""), null);
76+
resource.setContentLength(size - 1);
77+
78+
DefaultRepositorySystemSession session =
79+
new DefaultRepositorySystemSession(h -> false); // no close handle
80+
81+
// warm up
82+
test(listener, session, resource, 0);
83+
84+
for (int i = 1; i < size; i++) {
85+
final int bytes = i;
86+
87+
service.execute(() -> {
88+
test(listener, session, resource, bytes);
89+
});
90+
}
91+
92+
// start all threads at once
93+
try {
94+
startLatch.await();
95+
} catch (InterruptedException e) {
96+
e.printStackTrace();
97+
}
98+
99+
// wait for all thread to end
100+
try {
101+
endLatch.await();
102+
} catch (InterruptedException e) {
103+
e.printStackTrace();
104+
}
105+
106+
// despite all are back, we need to make sure all the events are processed (are async)
107+
// this one should block until all processed
108+
listener.transferSucceeded(new TransferEvent.Builder(session, resource)
109+
.setType(TransferEvent.EventType.SUCCEEDED)
110+
.build());
111+
112+
StringBuilder message = new StringBuilder("Messages [");
113+
boolean test = true;
114+
for (int i = 0; i < 999; i++) {
115+
boolean ok = output.containsKey("Progress (1): test-resource (" + i + "/999 B)");
116+
if (!ok) {
117+
System.out.println("false : " + i);
118+
message.append(i + ",");
70119
}
71-
},
72-
true));
73-
TransferResource resource =
74-
new TransferResource(null, null, "http://maven.org/test/test-resource", new File(""), null);
75-
resource.setContentLength(size - 1);
76-
77-
DefaultRepositorySystemSession session = new DefaultRepositorySystemSession(h -> false); // no close handle
78-
79-
// warm up
80-
test(listener, session, resource, 0);
81-
82-
for (int i = 1; i < size; i++) {
83-
final int bytes = i;
84-
85-
service.execute(() -> {
86-
test(listener, session, resource, bytes);
87-
});
88-
}
89-
90-
// start all threads at once
91-
try {
92-
startLatch.await();
93-
} catch (InterruptedException e) {
94-
e.printStackTrace();
95-
}
96-
97-
// wait for all thread to end
98-
try {
99-
endLatch.await();
100-
} catch (InterruptedException e) {
101-
e.printStackTrace();
102-
}
103-
104-
// despite all are back, we need to make sure all the events are processed (are async)
105-
// this one should block until all processed
106-
listener.transferSucceeded(new TransferEvent.Builder(session, resource)
107-
.setType(TransferEvent.EventType.SUCCEEDED)
108-
.build());
109-
110-
StringBuilder message = new StringBuilder("Messages [");
111-
boolean test = true;
112-
for (int i = 0; i < 999; i++) {
113-
boolean ok = output.containsKey("Progress (1): test-resource (" + i + "/999 B)");
114-
if (!ok) {
115-
System.out.println("false : " + i);
116-
message.append(i + ",");
120+
test = test & ok;
121+
}
122+
assertTrue(test, message + "] are missing in " + output);
117123
}
118-
test = test & ok;
124+
} finally {
125+
service.shutdown();
119126
}
120-
assertTrue(test, message + "] are missing in " + output);
121127
}
122128

123129
private void test(

impl/maven-cli/src/test/java/org/apache/maven/cling/transfer/SimplexTransferListenerTest.java

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -62,21 +62,21 @@ public void transferSucceeded(TransferEvent event) {}
6262
public void transferFailed(TransferEvent event) {}
6363
};
6464

65-
SimplexTransferListener listener = new SimplexTransferListener(delegate);
65+
try (SimplexTransferListener listener = new SimplexTransferListener(delegate)) {
66+
TransferResource resource =
67+
new TransferResource(null, null, "http://maven.org/test/test-resource", new File("file"), null);
68+
DefaultRepositorySystemSession session = new DefaultRepositorySystemSession(h -> false); // no close handle
6669

67-
TransferResource resource =
68-
new TransferResource(null, null, "http://maven.org/test/test-resource", new File("file"), null);
69-
DefaultRepositorySystemSession session = new DefaultRepositorySystemSession(h -> false); // no close handle
70+
// for technical reasons we cannot throw here, even if delegate does cancel transfer
71+
listener.transferInitiated(event(session, resource, TransferEvent.EventType.INITIATED));
7072

71-
// for technical reasons we cannot throw here, even if delegate does cancel transfer
72-
listener.transferInitiated(event(session, resource, TransferEvent.EventType.INITIATED));
73+
Thread.sleep(500); // to make sure queue is processed, cancellation applied
7374

74-
Thread.sleep(500); // to make sure queue is processed, cancellation applied
75-
76-
// subsequent call will cancel
77-
assertThrows(
78-
TransferCancelledException.class,
79-
() -> listener.transferStarted(event(session, resource, TransferEvent.EventType.STARTED)));
75+
// subsequent call will cancel
76+
assertThrows(
77+
TransferCancelledException.class,
78+
() -> listener.transferStarted(event(session, resource, TransferEvent.EventType.STARTED)));
79+
}
8080
}
8181

8282
@Test
@@ -85,24 +85,24 @@ void handlesAbsentTransferSource() throws InterruptedException, TransferCancelle
8585

8686
RepositorySystemSession session = Mockito.mock(RepositorySystemSession.class);
8787
TransferListener delegate = Mockito.mock(TransferListener.class);
88-
SimplexTransferListener listener = new SimplexTransferListener(delegate);
89-
90-
TransferEvent transferInitiatedEvent = event(session, resource, TransferEvent.EventType.INITIATED);
91-
TransferEvent transferStartedEvent = event(session, resource, TransferEvent.EventType.STARTED);
92-
TransferEvent transferProgressedEvent = event(session, resource, TransferEvent.EventType.PROGRESSED);
93-
TransferEvent transferSucceededEvent = event(session, resource, TransferEvent.EventType.SUCCEEDED);
94-
95-
listener.transferInitiated(transferInitiatedEvent);
96-
listener.transferStarted(transferStartedEvent);
97-
listener.transferProgressed(transferProgressedEvent);
98-
listener.transferSucceeded(transferSucceededEvent);
99-
100-
Thread.sleep(500); // to make sure queue is processed, cancellation applied
101-
102-
Mockito.verify(delegate).transferInitiated(transferInitiatedEvent);
103-
Mockito.verify(delegate).transferStarted(transferStartedEvent);
104-
Mockito.verify(delegate).transferProgressed(transferProgressedEvent);
105-
Mockito.verify(delegate).transferSucceeded(transferSucceededEvent);
88+
try (SimplexTransferListener listener = new SimplexTransferListener(delegate)) {
89+
TransferEvent transferInitiatedEvent = event(session, resource, TransferEvent.EventType.INITIATED);
90+
TransferEvent transferStartedEvent = event(session, resource, TransferEvent.EventType.STARTED);
91+
TransferEvent transferProgressedEvent = event(session, resource, TransferEvent.EventType.PROGRESSED);
92+
TransferEvent transferSucceededEvent = event(session, resource, TransferEvent.EventType.SUCCEEDED);
93+
94+
listener.transferInitiated(transferInitiatedEvent);
95+
listener.transferStarted(transferStartedEvent);
96+
listener.transferProgressed(transferProgressedEvent);
97+
listener.transferSucceeded(transferSucceededEvent);
98+
99+
Thread.sleep(500); // to make sure queue is processed, cancellation applied
100+
101+
Mockito.verify(delegate).transferInitiated(transferInitiatedEvent);
102+
Mockito.verify(delegate).transferStarted(transferStartedEvent);
103+
Mockito.verify(delegate).transferProgressed(transferProgressedEvent);
104+
Mockito.verify(delegate).transferSucceeded(transferSucceededEvent);
105+
}
106106
}
107107

108108
private static TransferEvent event(

0 commit comments

Comments
 (0)