Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,12 @@ protected TransferListener determineTransferListener(MavenContext context, boole
if (quiet || noTransferProgress || quietCI) {
delegate = new QuietMavenTransferListener();
} else if (context.interactive && !logFile) {
delegate = new SimplexTransferListener(new ConsoleMavenTransferListener(
SimplexTransferListener simplex = new SimplexTransferListener(new ConsoleMavenTransferListener(
context.invokerRequest.messageBuilderFactory(),
context.terminal.writer(),
context.invokerRequest.options().verbose().orElse(false)));
context.closeables.add(simplex);
delegate = simplex;
} else {
delegate = new Slf4jMavenTransferListener();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.eclipse.aether.transfer.AbstractTransferListener;
Expand All @@ -42,14 +43,16 @@
*
* @since 4.0.0
*/
public final class SimplexTransferListener extends AbstractTransferListener {
public final class SimplexTransferListener extends AbstractTransferListener implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(SimplexTransferListener.class);
private static final int QUEUE_SIZE = 1024;
private static final int BATCH_MAX_SIZE = 500;
private final TransferListener delegate;
private final int batchMaxSize;
private final boolean blockOnLastEvent;
private final ArrayBlockingQueue<Exchange> eventQueue;
private final AtomicBoolean closed;
private final Thread updater;

/**
* Constructor that makes passed in delegate run on single thread, and will block on last event.
Expand All @@ -76,11 +79,19 @@ public SimplexTransferListener(
this.blockOnLastEvent = blockOnLastEvent;

this.eventQueue = new ArrayBlockingQueue<>(queueSize);
Thread updater = new Thread(this::feedConsumer);
this.closed = new AtomicBoolean(false);
this.updater = new Thread(this::feedConsumer, "simplex-transfer-listener");
updater.setDaemon(true);
updater.start();
}

@Override
public void close() {
if (closed.compareAndSet(false, true)) {
updater.interrupt();
}
}

public TransferListener getDelegate() {
return delegate;
}
Expand All @@ -95,8 +106,8 @@ private void feedConsumer() {
}
demux(batch);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (InterruptedException ignored) {
// silent
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,79 +45,85 @@ class ConsoleMavenTransferListenerTest {
void testTransferProgressedWithPrintResourceNames() throws Exception {
int size = 1000;
ExecutorService service = Executors.newFixedThreadPool(size * 2);
startLatch = new CountDownLatch(size);
endLatch = new CountDownLatch(size);
Map<String, String> output = new ConcurrentHashMap<String, String>();

TransferListener listener = new SimplexTransferListener(new ConsoleMavenTransferListener(
new JLineMessageBuilderFactory(),
new PrintWriter(System.out) {

@Override
public void print(Object o) {

String string = o.toString();
int i = string.length() - 1;
while (i >= 0) {
char c = string.charAt(i);
if (c == '\n' || c == '\r' || c == ' ') i--;
else break;
try {
startLatch = new CountDownLatch(size);
endLatch = new CountDownLatch(size);
Map<String, String> output = new ConcurrentHashMap<String, String>();

try (SimplexTransferListener listener = new SimplexTransferListener(new ConsoleMavenTransferListener(
new JLineMessageBuilderFactory(),
new PrintWriter(System.out) {

@Override
public void print(Object o) {

String string = o.toString();
int i = string.length() - 1;
while (i >= 0) {
char c = string.charAt(i);
if (c == '\n' || c == '\r' || c == ' ') i--;
else break;
}

string = string.substring(0, i + 1).trim();
output.put(string, string);
System.out.print(o);
}

string = string.substring(0, i + 1).trim();
output.put(string, string);
System.out.print(o);
},
true))) {
TransferResource resource =
new TransferResource(null, null, "http://maven.org/test/test-resource", new File(""), null);
resource.setContentLength(size - 1);

DefaultRepositorySystemSession session =
new DefaultRepositorySystemSession(h -> false); // no close handle

// warm up
test(listener, session, resource, 0);

for (int i = 1; i < size; i++) {
final int bytes = i;

service.execute(() -> {
test(listener, session, resource, bytes);
});
}

// start all threads at once
try {
startLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

// wait for all thread to end
try {
endLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

// despite all are back, we need to make sure all the events are processed (are async)
// this one should block until all processed
listener.transferSucceeded(new TransferEvent.Builder(session, resource)
.setType(TransferEvent.EventType.SUCCEEDED)
.build());

StringBuilder message = new StringBuilder("Messages [");
boolean test = true;
for (int i = 0; i < 999; i++) {
boolean ok = output.containsKey("Progress (1): test-resource (" + i + "/999 B)");
if (!ok) {
System.out.println("false : " + i);
message.append(i + ",");
}
},
true));
TransferResource resource =
new TransferResource(null, null, "http://maven.org/test/test-resource", new File(""), null);
resource.setContentLength(size - 1);

DefaultRepositorySystemSession session = new DefaultRepositorySystemSession(h -> false); // no close handle

// warm up
test(listener, session, resource, 0);

for (int i = 1; i < size; i++) {
final int bytes = i;

service.execute(() -> {
test(listener, session, resource, bytes);
});
}

// start all threads at once
try {
startLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

// wait for all thread to end
try {
endLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

// despite all are back, we need to make sure all the events are processed (are async)
// this one should block until all processed
listener.transferSucceeded(new TransferEvent.Builder(session, resource)
.setType(TransferEvent.EventType.SUCCEEDED)
.build());

StringBuilder message = new StringBuilder("Messages [");
boolean test = true;
for (int i = 0; i < 999; i++) {
boolean ok = output.containsKey("Progress (1): test-resource (" + i + "/999 B)");
if (!ok) {
System.out.println("false : " + i);
message.append(i + ",");
test = test & ok;
}
assertTrue(test, message + "] are missing in " + output);
}
test = test & ok;
} finally {
service.shutdown();
}
assertTrue(test, message + "] are missing in " + output);
}

private void test(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,21 @@ public void transferSucceeded(TransferEvent event) {}
public void transferFailed(TransferEvent event) {}
};

SimplexTransferListener listener = new SimplexTransferListener(delegate);
try (SimplexTransferListener listener = new SimplexTransferListener(delegate)) {
TransferResource resource =
new TransferResource(null, null, "http://maven.org/test/test-resource", new File("file"), null);
DefaultRepositorySystemSession session = new DefaultRepositorySystemSession(h -> false); // no close handle

TransferResource resource =
new TransferResource(null, null, "http://maven.org/test/test-resource", new File("file"), null);
DefaultRepositorySystemSession session = new DefaultRepositorySystemSession(h -> false); // no close handle
// for technical reasons we cannot throw here, even if delegate does cancel transfer
listener.transferInitiated(event(session, resource, TransferEvent.EventType.INITIATED));

// for technical reasons we cannot throw here, even if delegate does cancel transfer
listener.transferInitiated(event(session, resource, TransferEvent.EventType.INITIATED));
Thread.sleep(500); // to make sure queue is processed, cancellation applied

Thread.sleep(500); // to make sure queue is processed, cancellation applied

// subsequent call will cancel
assertThrows(
TransferCancelledException.class,
() -> listener.transferStarted(event(session, resource, TransferEvent.EventType.STARTED)));
// subsequent call will cancel
assertThrows(
TransferCancelledException.class,
() -> listener.transferStarted(event(session, resource, TransferEvent.EventType.STARTED)));
}
}

@Test
Expand All @@ -85,24 +85,24 @@ void handlesAbsentTransferSource() throws InterruptedException, TransferCancelle

RepositorySystemSession session = Mockito.mock(RepositorySystemSession.class);
TransferListener delegate = Mockito.mock(TransferListener.class);
SimplexTransferListener listener = new SimplexTransferListener(delegate);

TransferEvent transferInitiatedEvent = event(session, resource, TransferEvent.EventType.INITIATED);
TransferEvent transferStartedEvent = event(session, resource, TransferEvent.EventType.STARTED);
TransferEvent transferProgressedEvent = event(session, resource, TransferEvent.EventType.PROGRESSED);
TransferEvent transferSucceededEvent = event(session, resource, TransferEvent.EventType.SUCCEEDED);

listener.transferInitiated(transferInitiatedEvent);
listener.transferStarted(transferStartedEvent);
listener.transferProgressed(transferProgressedEvent);
listener.transferSucceeded(transferSucceededEvent);

Thread.sleep(500); // to make sure queue is processed, cancellation applied

Mockito.verify(delegate).transferInitiated(transferInitiatedEvent);
Mockito.verify(delegate).transferStarted(transferStartedEvent);
Mockito.verify(delegate).transferProgressed(transferProgressedEvent);
Mockito.verify(delegate).transferSucceeded(transferSucceededEvent);
try (SimplexTransferListener listener = new SimplexTransferListener(delegate)) {
TransferEvent transferInitiatedEvent = event(session, resource, TransferEvent.EventType.INITIATED);
TransferEvent transferStartedEvent = event(session, resource, TransferEvent.EventType.STARTED);
TransferEvent transferProgressedEvent = event(session, resource, TransferEvent.EventType.PROGRESSED);
TransferEvent transferSucceededEvent = event(session, resource, TransferEvent.EventType.SUCCEEDED);

listener.transferInitiated(transferInitiatedEvent);
listener.transferStarted(transferStartedEvent);
listener.transferProgressed(transferProgressedEvent);
listener.transferSucceeded(transferSucceededEvent);

Thread.sleep(500); // to make sure queue is processed, cancellation applied

Mockito.verify(delegate).transferInitiated(transferInitiatedEvent);
Mockito.verify(delegate).transferStarted(transferStartedEvent);
Mockito.verify(delegate).transferProgressed(transferProgressedEvent);
Mockito.verify(delegate).transferSucceeded(transferSucceededEvent);
}
}

private static TransferEvent event(
Expand Down
Loading