Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -119,6 +120,21 @@ public void testOperatorLogging() {
Assertions.assertEquals("check.0 | onItem(HELLO)", logRecord.getMessage());
}

@Test
public void testContextPropagationCancelsUpstream() {
// See https://github.com/quarkusio/quarkus/issues/50513
CompletableFuture<String> cf = new CompletableFuture<>();
// we want to make sure that Infrastructure.setCompletableFutureWrapper uses .copy() in
// order to create a CF wrapper that forwards cancel upstream, which is what
// subscribeAsCompletionStage() calls.
// That cancel is then forwarded to the Uni, which is then forwarded to the original CF
Uni.createFrom().completionStage(cf).subscribeAsCompletionStage().cancel(true);

Assertions.assertTrue(cf.isCancelled());
Assertions.assertTrue(cf.isDone());
Assertions.assertTrue(cf.isCompletedExceptionally());
}

@ApplicationScoped
public static class BeanUsingMutiny {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;

import org.eclipse.microprofile.context.ManagedExecutor;
import org.eclipse.microprofile.context.ThreadContext;
Expand All @@ -23,6 +25,7 @@
import io.smallrye.context.SmallRyeManagedExecutor;
import io.smallrye.context.SmallRyeThreadContext;
import io.smallrye.context.impl.DefaultValues;
import io.smallrye.mutiny.infrastructure.Infrastructure;

/**
* The runtime value service used to create values related to the MP-JWT services
Expand Down Expand Up @@ -163,6 +166,13 @@ public void run() {
});
//Avoid leaking the classloader:
SmallRyeContextPropagationRecorder.builder = null;

Infrastructure.setCompletableFutureWrapper(new UnaryOperator<CompletableFuture<?>>() {
public CompletableFuture<?> apply(CompletableFuture<?> t) {
SmallRyeThreadContext threadContext = SmallRyeThreadContext.getCurrentThreadContextOrDefaultContexts();
return threadContext.copy(t);
}
});
}

public Supplier<Object> initializeManagedExecutor(ExecutorService executorService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ public void testFailure() throws InterruptedException {
verifyFailure("foo-completion-stage", "java.lang.NullPointerException: Something is null", false);
verifyFailure("foo-completion-stage-failure", "boom", true);
verifyFailure("foo-uni", "java.lang.NullPointerException: Something is null", false);
verifyFailure("foo-uni-failure", "java.io.IOException: boom", true);
verifyFailure("foo-uni-failure", "boom", true);

verifyFailure("foo-blocking", "java.lang.IllegalStateException: Red is dead", false);
verifyFailure("foo-message-blocking", "java.lang.NullPointerException", false);
verifyFailure("foo-completion-stage-blocking", "java.lang.NullPointerException: Something is null", false);
verifyFailure("foo-completion-stage-failure-blocking", "boom", true);
verifyFailure("foo-uni-blocking", "java.lang.NullPointerException: Something is null", false);
verifyFailure("foo-uni-failure-blocking", "java.io.IOException: boom", true);
verifyFailure("foo-uni-failure-blocking", "boom", true);
}

@Test
Expand Down