Skip to content

Commit 447c98d

Browse files
Bogdan DrutuMateusz Rzeszutekjonatan-ivanov
authored
Add capability to correctly export fixed buckets histogram to SignalFx (#2977)
* Add capability to correctly export fixed buckets histogram to SignalFx * Fix to always include the "+Inf" bucket. * Fix to never expire per bucket counters. * The metrics are still reported as Gauges by default, to fix this user has to enable a property. Signed-off-by: Bogdan Drutu <[email protected]> * Address feedback Signed-off-by: Bogdan Drutu <[email protected]> * Make SignalFx histograms cumulative without breaking max() * Hide DistributionStatisticsConfig changes behind a config flag * Polishing pass Co-authored-by: Mateusz Rzeszutek <[email protected]> Co-authored-by: Jonatan Ivanov <[email protected]>
1 parent 418b5ab commit 447c98d

File tree

6 files changed

+591
-17
lines changed

6 files changed

+591
-17
lines changed
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2022 VMware, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micrometer.signalfx;
17+
18+
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
19+
20+
import java.time.Duration;
21+
import java.util.Arrays;
22+
23+
/**
24+
* Adds cumulative histogram capabilities to the {@link DistributionStatisticConfig}.
25+
*
26+
* @author Bogdan Drutu
27+
* @author Mateusz Rzeszutek
28+
*/
29+
final class CumulativeHistogramConfigUtil {
30+
31+
static DistributionStatisticConfig updateConfig(DistributionStatisticConfig distributionStatisticConfig) {
32+
double[] sloBoundaries = distributionStatisticConfig.getServiceLevelObjectiveBoundaries();
33+
if (sloBoundaries == null || sloBoundaries.length == 0) {
34+
return distributionStatisticConfig;
35+
}
36+
double[] newSloBoundaries = sloBoundaries;
37+
// Add the +Inf bucket since the "count" resets every export.
38+
if (!isPositiveInf(sloBoundaries[sloBoundaries.length - 1])) {
39+
newSloBoundaries = Arrays.copyOf(sloBoundaries, sloBoundaries.length + 1);
40+
newSloBoundaries[newSloBoundaries.length - 1] = Double.MAX_VALUE;
41+
}
42+
43+
return DistributionStatisticConfig.builder()
44+
// Set the expiration duration for the histogram counts to be effectively infinite.
45+
// Without this, the counts are reset every expiry duration.
46+
.expiry(Duration.ofNanos(Long.MAX_VALUE)) // effectively infinite
47+
.bufferLength(1)
48+
.serviceLevelObjectives(newSloBoundaries)
49+
.build()
50+
.merge(distributionStatisticConfig);
51+
}
52+
53+
private static boolean isPositiveInf(double bucket) {
54+
return bucket == Double.POSITIVE_INFINITY || bucket == Double.MAX_VALUE || (long) bucket == Long.MAX_VALUE;
55+
}
56+
57+
private CumulativeHistogramConfigUtil() {
58+
}
59+
}

implementations/micrometer-registry-signalfx/src/main/java/io/micrometer/signalfx/SignalFxConfig.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ default String accessToken() {
4242
return getSecret(this, "accessToken").required().get();
4343
}
4444

45+
/**
46+
* @return {@code true} if the SignalFx registry should emit cumulative histogram buckets.
47+
*/
48+
default boolean publishCumulativeHistogram() {
49+
return getBoolean(this, "publishCumulativeHistogram").orElse(false);
50+
}
51+
4552
/**
4653
* @return The URI to ship metrics to. If you need to publish metrics to an internal proxy en route to
4754
* SignalFx, you can define the location of the proxy with this.

implementations/micrometer-registry-signalfx/src/main/java/io/micrometer/signalfx/SignalFxMeterRegistry.java

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,21 @@
2323
import com.signalfx.metrics.errorhandler.OnSendErrorHandler;
2424
import com.signalfx.metrics.flush.AggregateMetricSender;
2525
import com.signalfx.metrics.protobuf.SignalFxProtocolBuffers;
26-
import io.micrometer.core.instrument.*;
26+
import io.micrometer.core.instrument.Clock;
27+
import io.micrometer.core.instrument.Counter;
28+
import io.micrometer.core.instrument.DistributionSummary;
29+
import io.micrometer.core.instrument.FunctionCounter;
30+
import io.micrometer.core.instrument.FunctionTimer;
31+
import io.micrometer.core.instrument.Gauge;
32+
import io.micrometer.core.instrument.LongTaskTimer;
33+
import io.micrometer.core.instrument.Meter;
34+
import io.micrometer.core.instrument.Tag;
35+
import io.micrometer.core.instrument.TimeGauge;
36+
import io.micrometer.core.instrument.Timer;
2737
import io.micrometer.core.instrument.config.NamingConvention;
38+
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
39+
import io.micrometer.core.instrument.distribution.HistogramGauges;
40+
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
2841
import io.micrometer.core.instrument.step.StepMeterRegistry;
2942
import io.micrometer.core.instrument.util.MeterPartition;
3043
import io.micrometer.core.instrument.util.NamedThreadFactory;
@@ -41,6 +54,7 @@
4154
import java.util.stream.Stream;
4255

4356
import static com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.MetricType.COUNTER;
57+
import static com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.MetricType.CUMULATIVE_COUNTER;
4458
import static com.signalfx.metrics.protobuf.SignalFxProtocolBuffers.MetricType.GAUGE;
4559
import static java.util.stream.StreamSupport.stream;
4660

@@ -59,6 +73,7 @@ public class SignalFxMeterRegistry extends StepMeterRegistry {
5973
private final HttpEventProtobufReceiverFactory eventReceiverFactory;
6074
private final Set<OnSendErrorHandler> onSendErrorHandlerCollection = Collections.singleton(
6175
metricError -> this.logger.warn("failed to send metrics: {}", metricError.getMessage()));
76+
private final boolean publishCumulativeHistograms;
6277

6378
public SignalFxMeterRegistry(SignalFxConfig config, Clock clock) {
6479
this(config, clock, DEFAULT_THREAD_FACTORY);
@@ -71,16 +86,17 @@ public SignalFxMeterRegistry(SignalFxConfig config, Clock clock, ThreadFactory t
7186
URI apiUri = URI.create(config.uri());
7287
int port = apiUri.getPort();
7388
if (port == -1) {
74-
if ("http" .equals(apiUri.getScheme())) {
89+
if ("http".equals(apiUri.getScheme())) {
7590
port = 80;
76-
} else if ("https" .equals(apiUri.getScheme())) {
91+
} else if ("https".equals(apiUri.getScheme())) {
7792
port = 443;
7893
}
7994
}
8095

8196
SignalFxReceiverEndpoint signalFxEndpoint = new SignalFxEndpoint(apiUri.getScheme(), apiUri.getHost(), port);
8297
this.dataPointReceiverFactory = new HttpDataPointProtobufReceiverFactory(signalFxEndpoint);
8398
this.eventReceiverFactory = new HttpEventProtobufReceiverFactory(signalFxEndpoint);
99+
this.publishCumulativeHistograms = config.publishCumulativeHistogram();
84100

85101
config().namingConvention(new SignalFxNamingConvention());
86102

@@ -118,7 +134,27 @@ protected void publish() {
118134
}
119135
}
120136

121-
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addMeter(Meter meter) {
137+
@Override
138+
protected Timer newTimer(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig, PauseDetector pauseDetector) {
139+
if (!publishCumulativeHistograms) {
140+
return super.newTimer(id, distributionStatisticConfig, pauseDetector);
141+
}
142+
Timer timer = new SignalfxTimer(id, clock, distributionStatisticConfig, pauseDetector, getBaseTimeUnit(), config.step().toMillis());
143+
HistogramGauges.registerWithCommonFormat(timer, this);
144+
return timer;
145+
}
146+
147+
@Override
148+
protected DistributionSummary newDistributionSummary(Meter.Id id, DistributionStatisticConfig distributionStatisticConfig, double scale) {
149+
if (!publishCumulativeHistograms) {
150+
return super.newDistributionSummary(id, distributionStatisticConfig, scale);
151+
}
152+
DistributionSummary summary = new SignalfxDistributionSummary(id, clock, distributionStatisticConfig, scale, config.step().toMillis());
153+
HistogramGauges.registerWithCommonFormat(summary, this);
154+
return summary;
155+
}
156+
157+
Stream<SignalFxProtocolBuffers.DataPoint.Builder> addMeter(Meter meter) {
122158
return stream(meter.measure().spliterator(), false).flatMap(measurement -> {
123159
String statSuffix = NamingConvention.camelCase.tagKey(measurement.getStatistic().toString());
124160
switch (measurement.getStatistic()) {
@@ -170,23 +206,28 @@ Stream<SignalFxProtocolBuffers.DataPoint.Builder> addLongTaskTimer(LongTaskTimer
170206
);
171207
}
172208

173-
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addTimeGauge(TimeGauge timeGauge) {
209+
Stream<SignalFxProtocolBuffers.DataPoint.Builder> addTimeGauge(TimeGauge timeGauge) {
174210
return Stream.of(addDatapoint(timeGauge, GAUGE, null, timeGauge.value(getBaseTimeUnit())));
175211
}
176212

177-
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addGauge(Gauge gauge) {
213+
Stream<SignalFxProtocolBuffers.DataPoint.Builder> addGauge(Gauge gauge) {
214+
if (publishCumulativeHistograms
215+
&& gauge.getId().syntheticAssociation() != null
216+
&& gauge.getId().getName().endsWith(".histogram")) {
217+
return Stream.of(addDatapoint(gauge, CUMULATIVE_COUNTER, null, gauge.value()));
218+
}
178219
return Stream.of(addDatapoint(gauge, GAUGE, null, gauge.value()));
179220
}
180221

181-
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addCounter(Counter counter) {
222+
Stream<SignalFxProtocolBuffers.DataPoint.Builder> addCounter(Counter counter) {
182223
return Stream.of(addDatapoint(counter, COUNTER, null, counter.count()));
183224
}
184225

185-
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addFunctionCounter(FunctionCounter counter) {
226+
Stream<SignalFxProtocolBuffers.DataPoint.Builder> addFunctionCounter(FunctionCounter counter) {
186227
return Stream.of(addDatapoint(counter, COUNTER, null, counter.count()));
187228
}
188229

189-
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addTimer(Timer timer) {
230+
Stream<SignalFxProtocolBuffers.DataPoint.Builder> addTimer(Timer timer) {
190231
return Stream.of(
191232
addDatapoint(timer, COUNTER, "count", timer.count()),
192233
addDatapoint(timer, COUNTER, "totalTime", timer.totalTime(getBaseTimeUnit())),
@@ -195,15 +236,15 @@ private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addTimer(Timer timer)
195236
);
196237
}
197238

198-
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addFunctionTimer(FunctionTimer timer) {
239+
Stream<SignalFxProtocolBuffers.DataPoint.Builder> addFunctionTimer(FunctionTimer timer) {
199240
return Stream.of(
200241
addDatapoint(timer, COUNTER, "count", timer.count()),
201242
addDatapoint(timer, COUNTER, "totalTime", timer.totalTime(getBaseTimeUnit())),
202243
addDatapoint(timer, GAUGE, "avg", timer.mean(getBaseTimeUnit()))
203244
);
204245
}
205246

206-
private Stream<SignalFxProtocolBuffers.DataPoint.Builder> addDistributionSummary(DistributionSummary summary) {
247+
Stream<SignalFxProtocolBuffers.DataPoint.Builder> addDistributionSummary(DistributionSummary summary) {
207248
return Stream.of(
208249
addDatapoint(summary, COUNTER, "count", summary.count()),
209250
addDatapoint(summary, COUNTER, "totalTime", summary.totalAmount()),
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright 2022 VMware, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micrometer.signalfx;
17+
18+
import io.micrometer.core.instrument.AbstractDistributionSummary;
19+
import io.micrometer.core.instrument.Clock;
20+
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
21+
import io.micrometer.core.instrument.distribution.TimeWindowMax;
22+
import io.micrometer.core.instrument.step.StepTuple2;
23+
24+
import java.util.concurrent.atomic.DoubleAdder;
25+
import java.util.concurrent.atomic.LongAdder;
26+
27+
/**
28+
* This class is mostly the same as {@link io.micrometer.core.instrument.step.StepDistributionSummary}, with one notable
29+
* difference: the {@link DistributionStatisticConfig} is modified before being passed to the super class constructor -
30+
* that forces the histogram generated by this meter to be cumulative.
31+
*
32+
* @author Bogdan Drutu
33+
* @author Mateusz Rzeszutek
34+
*/
35+
final class SignalfxDistributionSummary extends AbstractDistributionSummary {
36+
37+
private final LongAdder count = new LongAdder();
38+
private final DoubleAdder total = new DoubleAdder();
39+
private final StepTuple2<Long, Double> countTotal;
40+
private final TimeWindowMax max;
41+
42+
SignalfxDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig, double scale, long stepMillis) {
43+
super(id, clock, CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig), scale, false);
44+
this.countTotal = new StepTuple2<>(clock, stepMillis, 0L, 0.0, count::sumThenReset, total::sumThenReset);
45+
max = new TimeWindowMax(clock, distributionStatisticConfig);
46+
}
47+
48+
@Override
49+
protected void recordNonNegative(double amount) {
50+
count.increment();
51+
total.add(amount);
52+
max.record(amount);
53+
}
54+
55+
@Override
56+
public long count() {
57+
return countTotal.poll1();
58+
}
59+
60+
@Override
61+
public double totalAmount() {
62+
return countTotal.poll2();
63+
}
64+
65+
@Override
66+
public double max() {
67+
return max.poll();
68+
}
69+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2022 VMware, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micrometer.signalfx;
17+
18+
import io.micrometer.core.instrument.AbstractTimer;
19+
import io.micrometer.core.instrument.Clock;
20+
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
21+
import io.micrometer.core.instrument.distribution.TimeWindowMax;
22+
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
23+
import io.micrometer.core.instrument.step.StepTuple2;
24+
import io.micrometer.core.instrument.util.TimeUtils;
25+
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.LongAdder;
28+
29+
/**
30+
* This class is mostly the same as {@link io.micrometer.core.instrument.step.StepTimer}, with one notable difference:
31+
* the {@link DistributionStatisticConfig} is modified before being passed to the super class constructor -
32+
* that forces the histogram generated by this meter to be cumulative.
33+
*
34+
* @author Bogdan Drutu
35+
* @author Mateusz Rzeszutek
36+
*/
37+
final class SignalfxTimer extends AbstractTimer {
38+
39+
private final LongAdder count = new LongAdder();
40+
private final LongAdder total = new LongAdder();
41+
private final StepTuple2<Long, Long> countTotal;
42+
private final TimeWindowMax max;
43+
44+
SignalfxTimer(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig, PauseDetector pauseDetector, TimeUnit baseTimeUnit, long stepMillis) {
45+
super(id, clock, CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig), pauseDetector, baseTimeUnit, false);
46+
countTotal = new StepTuple2<>(clock, stepMillis, 0L, 0L, count::sumThenReset, total::sumThenReset);
47+
max = new TimeWindowMax(clock, distributionStatisticConfig);
48+
}
49+
50+
@Override
51+
protected void recordNonNegative(long amount, TimeUnit unit) {
52+
final long nanoAmount = (long) TimeUtils.convert(amount, unit, TimeUnit.NANOSECONDS);
53+
count.increment();
54+
total.add(nanoAmount);
55+
max.record(amount, unit);
56+
}
57+
58+
@Override
59+
public long count() {
60+
return countTotal.poll1();
61+
}
62+
63+
@Override
64+
public double totalTime(TimeUnit unit) {
65+
return TimeUtils.nanosToUnit(countTotal.poll2(), unit);
66+
}
67+
68+
@Override
69+
public double max(TimeUnit unit) {
70+
return max.poll(unit);
71+
}
72+
}

0 commit comments

Comments
 (0)