Skip to content

Commit f30ee66

Browse files
AsyncReporter/SpanHandler: make queuedMaxBytes=0 disable pre-flight size checks (#260)
Signed-off-by: Andriy Redko <[email protected]> Signed-off-by: Adrian Cole <[email protected]> Co-authored-by: Adrian Cole <[email protected]>
1 parent 0c62f8b commit f30ee66

File tree

10 files changed

+588
-72
lines changed

10 files changed

+588
-72
lines changed

benchmarks/src/test/java/zipkin2/reporter/internal/AsyncReporterBenchmarks.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
import java.util.concurrent.TimeUnit;
88
import java.util.concurrent.atomic.AtomicLong;
9+
import java.util.stream.Stream;
10+
911
import org.openjdk.jmh.annotations.AuxCounters;
1012
import org.openjdk.jmh.annotations.Benchmark;
1113
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -24,6 +26,7 @@
2426
import org.openjdk.jmh.annotations.Warmup;
2527
import zipkin2.Span;
2628
import zipkin2.TestObjects;
29+
import zipkin2.reporter.BytesEncoder;
2730
import zipkin2.reporter.Encoding;
2831
import zipkin2.reporter.InMemoryReporterMetrics;
2932
import zipkin2.reporter.SpanBytesEncoder;
@@ -42,6 +45,9 @@ public class AsyncReporterBenchmarks {
4245
@Param
4346
public Encoding encoding;
4447

48+
@Param({"0", "20000000"})
49+
public int maxBytes;
50+
4551
@AuxCounters
4652
@State(Scope.Thread)
4753
public static class InMemoryReporterMetricsAsCounters {
@@ -77,10 +83,17 @@ public void clean() {
7783

7884
@Setup(Level.Trial)
7985
public void setup() {
86+
final BytesEncoder<Span> encoder = Stream
87+
.of(SpanBytesEncoder.JSON_V2, SpanBytesEncoder.PROTO3, SpanBytesEncoder.THRIFT)
88+
.filter(e -> e.encoding().equals(encoding))
89+
.findAny()
90+
.orElseThrow(() -> new IllegalStateException("Unable to find BytesEncoder<Span> for " + encoding));
91+
8092
reporter = AsyncReporter.newBuilder(new NoopSender(encoding))
8193
.messageMaxBytes(1000000) // example default from Kafka message.max.bytes
94+
.queuedMaxBytes(maxBytes)
8295
.metrics(metrics)
83-
.build(SpanBytesEncoder.JSON_V2);
96+
.build(encoder);
8497
}
8598

8699
@Benchmark @Group("no_contention") @GroupThreads(1)
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Copyright The OpenZipkin Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
package zipkin2.reporter.internal;
6+
7+
import java.util.concurrent.TimeUnit;
8+
import org.openjdk.jmh.annotations.AuxCounters;
9+
import org.openjdk.jmh.annotations.Benchmark;
10+
import org.openjdk.jmh.annotations.BenchmarkMode;
11+
import org.openjdk.jmh.annotations.Fork;
12+
import org.openjdk.jmh.annotations.Group;
13+
import org.openjdk.jmh.annotations.GroupThreads;
14+
import org.openjdk.jmh.annotations.Level;
15+
import org.openjdk.jmh.annotations.Measurement;
16+
import org.openjdk.jmh.annotations.Mode;
17+
import org.openjdk.jmh.annotations.OutputTimeUnit;
18+
import org.openjdk.jmh.annotations.Param;
19+
import org.openjdk.jmh.annotations.Scope;
20+
import org.openjdk.jmh.annotations.Setup;
21+
import org.openjdk.jmh.annotations.State;
22+
import org.openjdk.jmh.annotations.TearDown;
23+
import org.openjdk.jmh.annotations.Warmup;
24+
import org.openjdk.jmh.runner.Runner;
25+
import org.openjdk.jmh.runner.RunnerException;
26+
import org.openjdk.jmh.runner.options.Options;
27+
import org.openjdk.jmh.runner.options.OptionsBuilder;
28+
29+
@Measurement(iterations = 5, time = 1)
30+
@Warmup(iterations = 10, time = 1)
31+
@Fork(3)
32+
@BenchmarkMode(Mode.Throughput)
33+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
34+
@State(Scope.Group)
35+
public class BoundedQueueBenchmarks {
36+
static final byte ONE = 1;
37+
38+
@Param( {"0", "10000"})
39+
public int maxBytes;
40+
41+
@AuxCounters
42+
@State(Scope.Thread)
43+
public static class OfferCounters {
44+
public int offersFailed;
45+
public int offersMade;
46+
47+
@Setup(Level.Iteration)
48+
public void clean() {
49+
offersFailed = offersMade = 0;
50+
}
51+
}
52+
53+
@AuxCounters
54+
@State(Scope.Thread)
55+
public static class DrainCounters {
56+
public int drained;
57+
58+
@Setup(Level.Iteration)
59+
public void clean() {
60+
drained = 0;
61+
}
62+
}
63+
64+
private static ThreadLocal<Object> marker = new ThreadLocal<>();
65+
66+
@State(Scope.Thread)
67+
public static class ConsumerMarker {
68+
public ConsumerMarker() {
69+
marker.set(this);
70+
}
71+
}
72+
73+
BoundedQueue<Byte> q;
74+
75+
@Setup
76+
public void setup() {
77+
q = BoundedQueue.create(null, null, null, 10000, 10000, maxBytes);
78+
}
79+
80+
@Benchmark @Group("no_contention") @GroupThreads(1)
81+
public void no_contention_offer(OfferCounters counters) {
82+
if (q.offer(ONE, 1)) {
83+
counters.offersMade++;
84+
} else {
85+
counters.offersFailed++;
86+
}
87+
}
88+
89+
@Benchmark @Group("no_contention") @GroupThreads(1)
90+
public void no_contention_drain(DrainCounters counters, ConsumerMarker cm) {
91+
q.drainTo((s, b) -> {
92+
counters.drained++;
93+
return true;
94+
}, 1000);
95+
}
96+
97+
@Benchmark @Group("mild_contention") @GroupThreads(2)
98+
public void mild_contention_offer(OfferCounters counters) {
99+
if (q.offer(ONE, 1)) {
100+
counters.offersMade++;
101+
} else {
102+
counters.offersFailed++;
103+
}
104+
}
105+
106+
@Benchmark @Group("mild_contention") @GroupThreads(1)
107+
public void mild_contention_drain(DrainCounters counters, ConsumerMarker cm) {
108+
q.drainTo((s, b) -> {
109+
counters.drained++;
110+
return true;
111+
}, 1000);
112+
}
113+
114+
@Benchmark @Group("high_contention") @GroupThreads(8)
115+
public void high_contention_offer(OfferCounters counters) {
116+
if (q.offer(ONE, 1)) {
117+
counters.offersMade++;
118+
} else {
119+
counters.offersFailed++;
120+
}
121+
}
122+
123+
@Benchmark @Group("high_contention") @GroupThreads(1)
124+
public void high_contention_drain(DrainCounters counters, ConsumerMarker cm) {
125+
q.drainTo((s, b) -> {
126+
counters.drained++;
127+
return true;
128+
}, 1000);
129+
}
130+
131+
@TearDown(Level.Iteration)
132+
public void emptyQ() {
133+
// If this thread didn't drain, return
134+
if (marker.get() == null) return;
135+
q.clear();
136+
}
137+
138+
// Convenience main entry-point
139+
public static void main(String[] args) throws RunnerException {
140+
Options opt = new OptionsBuilder()
141+
.include(".*" + BoundedQueueBenchmarks.class.getSimpleName() + ".*")
142+
.build();
143+
144+
new Runner(opt).run();
145+
}
146+
}

benchmarks/src/test/java/zipkin2/reporter/internal/ByteBoundedQueueBenchmarks.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public ConsumerMarker() {
7070

7171
@Setup
7272
public void setup() {
73-
q = new ByteBoundedQueue<>(10000, 10000);
73+
q = new ByteBoundedQueue<>(null, null, null, 10000, 10000, 10000);
7474
}
7575

7676
@Benchmark @Group("no_contention") @GroupThreads(1)

core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@
4545
* @param <S> type of the span, usually {@code zipkin2.Span}
4646
* @since 3.0
4747
*/
48-
public abstract class AsyncReporter<S> extends Component implements Reporter<S>, Closeable, Flushable {
48+
public abstract class AsyncReporter<S> extends Component
49+
implements Reporter<S>, Closeable, Flushable {
4950
public static Builder newBuilder(BytesMessageSender sender) {
5051
return new Builder(sender);
5152
}
@@ -82,8 +83,8 @@ public static final class Builder {
8283
this.messageMaxBytes = asyncReporter.messageMaxBytes;
8384
this.messageTimeoutNanos = asyncReporter.messageTimeoutNanos;
8485
this.closeTimeoutNanos = asyncReporter.closeTimeoutNanos;
85-
this.queuedMaxSpans = asyncReporter.pending.maxSize;
86-
this.queuedMaxBytes = asyncReporter.pending.maxBytes;
86+
this.queuedMaxSpans = asyncReporter.pending.maxSize();
87+
this.queuedMaxBytes = asyncReporter.queuedMaxBytes;
8788
}
8889

8990
static int onePercentOfMemory() {
@@ -181,8 +182,9 @@ static final class BoundedAsyncReporter<S> extends AsyncReporter<S> {
181182
static final Logger logger = Logger.getLogger(BoundedAsyncReporter.class.getName());
182183
final AtomicBoolean started, closed;
183184
final BytesEncoder<S> encoder;
184-
final ByteBoundedQueue<S> pending;
185+
final BoundedQueue<S> pending;
185186
final BytesMessageSender sender;
187+
final int queuedMaxBytes;
186188
final int messageMaxBytes;
187189
final long messageTimeoutNanos, closeTimeoutNanos;
188190
final CountDownLatch close;
@@ -193,8 +195,10 @@ static final class BoundedAsyncReporter<S> extends AsyncReporter<S> {
193195
private boolean shouldWarnException = true;
194196

195197
BoundedAsyncReporter(Builder builder, BytesEncoder<S> encoder) {
196-
this.pending = new ByteBoundedQueue<S>(builder.queuedMaxSpans, builder.queuedMaxBytes);
198+
this.pending = BoundedQueue.create(encoder, builder.sender, builder.metrics,
199+
builder.messageMaxBytes, builder.queuedMaxSpans, builder.queuedMaxBytes);
197200
this.sender = builder.sender;
201+
this.queuedMaxBytes = builder.queuedMaxBytes;
198202
this.messageMaxBytes = builder.messageMaxBytes;
199203
this.messageTimeoutNanos = builder.messageTimeoutNanos;
200204
this.closeTimeoutNanos = builder.closeTimeoutNanos;
@@ -216,18 +220,15 @@ void startFlusherThread() {
216220
flushThread.start();
217221
}
218222

223+
@SuppressWarnings("unchecked")
219224
@Override public void report(S next) {
220225
if (next == null) throw new NullPointerException("span == null");
221226
// Lazy start so that reporters never used don't spawn threads
222227
if (started.compareAndSet(false, true)) startFlusherThread();
223228
metrics.incrementSpans(1);
224-
int nextSizeInBytes = encoder.sizeInBytes(next);
225-
int messageSizeOfNextSpan = sender.messageSizeInBytes(nextSizeInBytes);
226-
metrics.incrementSpanBytes(nextSizeInBytes);
227-
if (closed.get() ||
228-
// don't enqueue something larger than we can drain
229-
messageSizeOfNextSpan > messageMaxBytes ||
230-
!pending.offer(next, nextSizeInBytes)) {
229+
230+
// enqueue now and filter our when we drain
231+
if (closed.get() || !pending.offer(next)) {
231232
metrics.incrementSpansDropped(1);
232233
}
233234
}
@@ -240,10 +241,6 @@ void startFlusherThread() {
240241
void flush(BufferNextMessage<S> bundler) {
241242
pending.drainTo(bundler, bundler.remainingNanos());
242243

243-
// record after flushing reduces the amount of gauge events vs on doing this on report
244-
metrics.updateQueuedSpans(pending.count);
245-
metrics.updateQueuedBytes(pending.sizeInBytes);
246-
247244
// loop around if we are running, and the bundle isn't full
248245
// if we are closed, try to send what's pending
249246
if (!bundler.isReady() && !closed.get()) return;
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright The OpenZipkin Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package zipkin2.reporter.internal;
7+
8+
import zipkin2.reporter.BytesEncoder;
9+
import zipkin2.reporter.BytesMessageSender;
10+
import zipkin2.reporter.ReporterMetrics;
11+
12+
/**
13+
* Multi-producer, multi-consumer queue that could be bounded by count or/and size.
14+
*/
15+
abstract class BoundedQueue<S> implements SpanWithSizeConsumer<S> {
16+
static <S> BoundedQueue<S> create(BytesEncoder<S> encoder, BytesMessageSender sender,
17+
ReporterMetrics metrics, int messageMaxBytes, int maxSize, int maxBytes) {
18+
if (maxBytes > 0) {
19+
return new ByteBoundedQueue<S>(encoder, sender, metrics, messageMaxBytes, maxSize, maxBytes);
20+
} else {
21+
return new CountBoundedQueue<S>(encoder, sender, metrics, messageMaxBytes, maxSize);
22+
}
23+
}
24+
25+
/**
26+
* Max element's count of this bounded queue
27+
*/
28+
abstract int maxSize();
29+
30+
/**
31+
* Clear this bounded queue
32+
*/
33+
abstract int clear();
34+
35+
/**
36+
* Drains this bounded queue. Blocks for up to nanosTimeout for spans to appear.
37+
* Then, consume as many as possible.
38+
*/
39+
abstract int drainTo(SpanWithSizeConsumer<S> bundler, long remainingNanos);
40+
41+
/** Returns true if the element could be added or false if it could not. */
42+
abstract boolean offer(S next);
43+
}
44+
45+
interface SpanWithSizeConsumer<S> {
46+
/** Returns true if the element could be added or false if it could not due to its size. */
47+
boolean offer(S next, int nextSizeInBytes);
48+
}
49+

0 commit comments

Comments
 (0)