Skip to content

Commit 08f1eb0

Browse files
authored
Fix stress test expectation and bug in ColdTestPublisher (#3700)
This PR fixes an early termination bug in `ColdTestPublisher` which happens when request from the downstream subscriber races with new values and following termination which in rare case ends up in termination without propagation of the remaining values. Also this PR improves expectation for the stress test where ColdTestPulisher takes a key role. closes #3633 --------- Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 175dce4 commit 08f1eb0

File tree

2 files changed

+16
-6
lines changed

2 files changed

+16
-6
lines changed

reactor-core/src/jcstress/java/reactor/core/publisher/FluxSwitchMapStressTest.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,8 @@ public void arbiter(II_Result r) {
272272

273273
// Ignore, flaky test (https://github.com/reactor/reactor-core/issues/3633)
274274
//@JCStressTest
275-
@Outcome(id = {"200, 0", "200, 1"}, expect = ACCEPTABLE, desc = "Should produced exactly what was requested")
275+
@Outcome(id = {"200, 0, 0", "200, 0, 1"}, expect = ACCEPTABLE, desc = "Should " +
276+
"produced exactly what was requested")
276277
@State
277278
public static class RequestAndProduceStressTest2 extends FluxSwitchMapStressTest {
278279

@@ -312,9 +313,17 @@ public void outerRequest() {
312313
}
313314

314315
@Arbiter
315-
public void arbiter(II_Result r) {
316-
r.r1 = (int) (stressSubscriber.onNextCalls.get() + switchMapMain.requested);
317-
r.r2 = stressSubscriber.onCompleteCalls.get();
316+
public void arbiter(III_Result r) {
317+
r.r1 = stressSubscriber.onNextCalls.get();
318+
r.r2 = (int) switchMapMain.requested;
319+
r.r3 = stressSubscriber.onCompleteCalls.get();
320+
321+
switch (r.toString()) {
322+
case "200, 0, 0":
323+
case "200, 0, 1":
324+
break;
325+
default: throw new IllegalStateException(r + " " + fastLogger);
326+
}
318327

319328
if (stressSubscriber.onNextCalls.get() < 200 && stressSubscriber.onNextDiscarded.get() < switchMapMain.requested) {
320329
throw new IllegalStateException(r + " " + fastLogger);

reactor-test/src/main/java/reactor/test/publisher/ColdTestPublisher.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2017-2024 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -103,6 +103,7 @@ public void subscribe(Subscriber<? super T> s) {
103103
}
104104

105105
s.onSubscribe(p); // will trigger drain() via request()
106+
p.drain(); // ensures that empty source terminal signal is propagated without waiting for a request from the subscriber
106107
}
107108

108109
boolean add(ColdTestPublisherSubscription<T> s) {
@@ -315,7 +316,7 @@ private void drain() {
315316
* @return true if the TestPublisher was terminated, false otherwise
316317
*/
317318
private boolean emitTerminalSignalIfAny() {
318-
if (parent.done) {
319+
if (parent.done && this.parent.values.size() == index) {
319320
parent.remove(this);
320321

321322
final Throwable t = parent.error;

0 commit comments

Comments
 (0)