From 3cc9c2c54609d909ff3cab9b1a8aecdfaac82d7f Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 22 Jan 2024 15:09:37 +0200 Subject: [PATCH 1/2] fixes stress test expectation and bug in ColdTestPublisher Signed-off-by: Oleh Dokuka --- .../core/publisher/FluxSwitchMapStressTest.java | 17 +++++++++++++---- .../test/publisher/ColdTestPublisher.java | 3 ++- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxSwitchMapStressTest.java b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxSwitchMapStressTest.java index a6e275870a..b13a2a9a0e 100644 --- a/reactor-core/src/jcstress/java/reactor/core/publisher/FluxSwitchMapStressTest.java +++ b/reactor-core/src/jcstress/java/reactor/core/publisher/FluxSwitchMapStressTest.java @@ -272,7 +272,8 @@ public void arbiter(II_Result r) { // Ignore, flaky test (https://github.com/reactor/reactor-core/issues/3633) //@JCStressTest - @Outcome(id = {"200, 0", "200, 1"}, expect = ACCEPTABLE, desc = "Should produced exactly what was requested") + @Outcome(id = {"200, 0, 0", "200, 0, 1"}, expect = ACCEPTABLE, desc = "Should " + + "produced exactly what was requested") @State public static class RequestAndProduceStressTest2 extends FluxSwitchMapStressTest { @@ -312,9 +313,17 @@ public void outerRequest() { } @Arbiter - public void arbiter(II_Result r) { - r.r1 = (int) (stressSubscriber.onNextCalls.get() + switchMapMain.requested); - r.r2 = stressSubscriber.onCompleteCalls.get(); + public void arbiter(III_Result r) { + r.r1 = stressSubscriber.onNextCalls.get(); + r.r2 = (int) switchMapMain.requested; + r.r3 = stressSubscriber.onCompleteCalls.get(); + + switch (r.toString()) { + case "200, 0, 0": + case "200, 0, 1": + break; + default: throw new IllegalStateException(r + " " + fastLogger); + } if (stressSubscriber.onNextCalls.get() < 200 && stressSubscriber.onNextDiscarded.get() < switchMapMain.requested) { throw new IllegalStateException(r + " " + fastLogger); diff --git a/reactor-test/src/main/java/reactor/test/publisher/ColdTestPublisher.java b/reactor-test/src/main/java/reactor/test/publisher/ColdTestPublisher.java index 9385149dae..5790ed3923 100644 --- a/reactor-test/src/main/java/reactor/test/publisher/ColdTestPublisher.java +++ b/reactor-test/src/main/java/reactor/test/publisher/ColdTestPublisher.java @@ -103,6 +103,7 @@ public void subscribe(Subscriber s) { } s.onSubscribe(p); // will trigger drain() via request() + p.drain(); // ensures that empty source terminal signal is propagated without waiting for a request from the subscriber } boolean add(ColdTestPublisherSubscription s) { @@ -315,7 +316,7 @@ private void drain() { * @return true if the TestPublisher was terminated, false otherwise */ private boolean emitTerminalSignalIfAny() { - if (parent.done) { + if (parent.done && this.parent.values.size() == index) { parent.remove(this); final Throwable t = parent.error; From 648f56cc291ffb453d4c37b085e97005acddd20b Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Mon, 22 Jan 2024 22:27:44 +0200 Subject: [PATCH 2/2] fixes spotless Signed-off-by: Oleh Dokuka --- .../src/main/java/reactor/test/publisher/ColdTestPublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reactor-test/src/main/java/reactor/test/publisher/ColdTestPublisher.java b/reactor-test/src/main/java/reactor/test/publisher/ColdTestPublisher.java index 5790ed3923..bf76c84830 100644 --- a/reactor-test/src/main/java/reactor/test/publisher/ColdTestPublisher.java +++ b/reactor-test/src/main/java/reactor/test/publisher/ColdTestPublisher.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.