diff --git a/docs/config_properties.md b/docs/config_properties.md index c86ada23..f9e2dc0c 100644 --- a/docs/config_properties.md +++ b/docs/config_properties.md @@ -29,6 +29,8 @@ | spark.kubernetes.operator.metrics.clientMetricsEnabled | Boolean | true | false | Enable KubernetesClient metrics for measuring the HTTP traffic to the Kubernetes API Server. Since the metrics is collected via Okhttp interceptors, can be disabled when opt in customized interceptors. | | spark.kubernetes.operator.metrics.clientMetricsGroupByResponseCodeEnabled | Boolean | true | false | When enabled, additional metrics group by http response code group(1xx, 2xx, 3xx, 4xx, 5xx) received from API server will be added. Users can disable it when their monitoring system can combine lower level kubernetes.client.http.response.<3-digit-response-code> metrics. | | spark.kubernetes.operator.metrics.port | Integer | 19090 | false | The port used for checking metrics | + | spark.kubernetes.operator.metrics.prometheusTextBasedFormatEnabled | Boolean | true | false | Whether or not to enable text-based format for Prometheus 2.0, as recommended by https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format | + | spark.kubernetes.operator.metrics.sanitizePrometheusMetricsNameEnabled | Boolean | true | false | Whether or not to enable automatic name sanitizing for all metrics based on best-practice guide from Prometheus https://prometheus.io/docs/practices/naming/ | | spark.kubernetes.operator.health.probePort | Integer | 19091 | false | The port used for health/readiness check probe status. | | spark.kubernetes.operator.health.sentinelExecutorPoolSize | Integer | 3 | false | Size of executor service in Sentinel Managers to check the health of sentinel resources. | | spark.kubernetes.operator.health.sentinelResourceReconciliationDelaySeconds | Integer | 60 | true | Allowed max time(seconds) between spec update and reconciliation for sentinel resources. | diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java index 3aca8cd5..b3b352ef 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/config/SparkOperatorConf.java @@ -334,6 +334,30 @@ public final class SparkOperatorConf { .defaultValue(19090) .build(); + public static final ConfigOption PROMETHEUS_TEXT_BASED_FORMAT_ENABLED = + ConfigOption.builder() + .key("spark.kubernetes.operator.metrics.prometheusTextBasedFormatEnabled") + .enableDynamicOverride(false) + .description( + "Whether or not to enable text-based format for Prometheus 2.0, as " + + "recommended by " + + "https://prometheus.io/docs/instrumenting/exposition_formats/#text-based-format") + .typeParameterClass(Boolean.class) + .defaultValue(true) + .build(); + + public static final ConfigOption SANITIZE_PROMETHEUS_METRICS_NAME_ENABLED = + ConfigOption.builder() + .key("spark.kubernetes.operator.metrics.sanitizePrometheusMetricsNameEnabled") + .enableDynamicOverride(false) + .description( + "Whether or not to enable automatic name sanitizing for all metrics based on " + + "best-practice guide from Prometheus " + + "https://prometheus.io/docs/practices/naming/") + .typeParameterClass(Boolean.class) + .defaultValue(true) + .build(); + public static final ConfigOption OPERATOR_PROBE_PORT = ConfigOption.builder() .key("spark.kubernetes.operator.health.probePort") diff --git a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandler.java b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandler.java index d5a72a8c..8592e185 100644 --- a/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandler.java +++ b/spark-operator/src/main/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandler.java @@ -29,21 +29,38 @@ import java.util.Map; import java.util.Properties; +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; import com.sun.net.httpserver.HttpExchange; import com.sun.net.httpserver.HttpHandler; import jakarta.servlet.http.HttpServletRequest; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.spark.k8s.operator.config.SparkOperatorConf; import org.apache.spark.metrics.sink.PrometheusServlet; /** Serves as simple Prometheus sink (pull model), presenting metrics snapshot as HttpHandler. */ @Slf4j public class PrometheusPullModelHandler extends PrometheusServlet implements HttpHandler { private static final String EMPTY_RECORD_VALUE = "[]"; + @Getter private final MetricRegistry registry; + @Getter private final boolean enablePrometheusTextBasedFormat; + @Getter private final boolean enableSanitizePrometheusMetricsName; public PrometheusPullModelHandler(Properties properties, MetricRegistry registry) { super(properties, registry); + this.registry = registry; + this.enablePrometheusTextBasedFormat = + SparkOperatorConf.PROMETHEUS_TEXT_BASED_FORMAT_ENABLED.getValue(); + this.enableSanitizePrometheusMetricsName = + SparkOperatorConf.SANITIZE_PROMETHEUS_METRICS_NAME_ENABLED.getValue(); } @Override @@ -58,13 +75,21 @@ public void stop() { @Override public void handle(HttpExchange exchange) throws IOException { - HttpServletRequest httpServletRequest = null; - String value = getMetricsSnapshot(httpServletRequest); - sendMessage( - exchange, - HTTP_OK, - String.join("\n", filterNonEmptyRecords(value)), - Map.of("Content-Type", Collections.singletonList("text/plain;version=0.0.4"))); + if (enablePrometheusTextBasedFormat) { + sendMessage( + exchange, + HTTP_OK, + formatMetricsSnapshot(), + Map.of("Content-Type", Collections.singletonList("text/plain;version=0.0.4"))); + } else { + HttpServletRequest httpServletRequest = null; + String value = getMetricsSnapshot(httpServletRequest); + sendMessage( + exchange, + HTTP_OK, + String.join("\n", filterNonEmptyRecords(value)), + Map.of("Content-Type", Collections.singletonList("text/plain;version=0.0.4"))); + } } protected List filterNonEmptyRecords(String metricsSnapshot) { @@ -82,4 +107,254 @@ protected List filterNonEmptyRecords(String metricsSnapshot) { } return filteredRecords; } + + protected String formatMetricsSnapshot() { + Map gauges = registry.getGauges(); + Map counters = registry.getCounters(); + Map histograms = registry.getHistograms(); + Map meters = registry.getMeters(); + Map timers = registry.getTimers(); + + StringBuilder stringBuilder = new StringBuilder(); + + for (Map.Entry entry : gauges.entrySet()) { + appendIfNotEmpty(stringBuilder, formatGauge(entry.getKey(), entry.getValue())); + } + + // Counters + for (Map.Entry entry : counters.entrySet()) { + String name = sanitize(entry.getKey()) + "_total"; + Counter counter = entry.getValue(); + appendIfNotEmpty(stringBuilder, formatCounter(name, counter)); + } + + // Histograms + for (Map.Entry entry : histograms.entrySet()) { + appendIfNotEmpty(stringBuilder, formatHistogram(entry.getKey(), entry.getValue())); + } + + // Meters + for (Map.Entry entry : meters.entrySet()) { + appendIfNotEmpty(stringBuilder, formatMeter(entry.getKey(), entry.getValue())); + } + + // Timers (Meter + Histogram in nanoseconds) + for (Map.Entry entry : timers.entrySet()) { + appendIfNotEmpty(stringBuilder, formatTimer(entry.getKey(), entry.getValue())); + } + return stringBuilder.toString(); + } + + protected void appendIfNotEmpty(StringBuilder stringBuilder, String value) { + if (StringUtils.isNotEmpty(value)) { + stringBuilder.append(value); + } + } + + protected String formatGauge(String name, Gauge gauge) { + if (gauge != null + && gauge.getValue() != null + && !EMPTY_RECORD_VALUE.equals(gauge.getValue()) + && gauge.getValue() instanceof Number) { + String formattedName = sanitize(name); + return "# HELP " + + formattedName + + " Gauge metric\n" + + "# TYPE " + + formattedName + + " gauge\n" + + formattedName + + ' ' + + gauge.getValue() + + "\n\n"; + } + return null; + } + + protected String formatCounter(String name, Counter counter) { + if (counter != null) { + String formattedName = sanitize(name); + return "# HELP " + + formattedName + + " Counter metric\n" + + "# TYPE " + + formattedName + + " counter\n" + + formattedName + + " " + + counter.getCount() + + "\n\n"; + } + return null; + } + + protected String formatHistogram(String name, Histogram histogram) { + if (histogram != null && histogram.getSnapshot() != null) { + String baseName = sanitize(name); + Snapshot snap = histogram.getSnapshot(); + long count = histogram.getCount(); + boolean isNanosHistogram = baseName.contains("nanos"); + if (isNanosHistogram) { + baseName = nanosMetricsNameToSeconds(baseName); + } + double sum = + isNanosHistogram ? nanosToSeconds(snap.getMean() * count) : snap.getMean() * count; + return "# HELP " + + baseName + + " Histogram metric\n# TYPE " + + baseName + + " summary\n" + + baseName + + "{quantile=\"0.5\"} " + + (isNanosHistogram ? nanosToSeconds(snap.getMedian()) : snap.getMean()) + + "\n" + + baseName + + "{quantile=\"0.75\"} " + + (isNanosHistogram ? nanosToSeconds(snap.get75thPercentile()) : snap.get75thPercentile()) + + "\n" + + baseName + + "{quantile=\"0.95\"} " + + (isNanosHistogram ? nanosToSeconds(snap.get95thPercentile()) : snap.get95thPercentile()) + + "\n" + + baseName + + "{quantile=\"0.98\"} " + + (isNanosHistogram ? nanosToSeconds(snap.get98thPercentile()) : snap.get98thPercentile()) + + "\n" + + baseName + + "{quantile=\"0.99\"} " + + (isNanosHistogram ? nanosToSeconds(snap.get99thPercentile()) : snap.get99thPercentile()) + + "\n" + + baseName + + "{quantile=\"0.999\"} " + + (isNanosHistogram + ? nanosToSeconds(snap.get999thPercentile()) + : snap.get99thPercentile()) + + "\n" + + baseName + + "_count " + + count + + "\n" + + baseName + + "_sum " + + sum + + "\n\n"; + } + return null; + } + + protected String formatMeter(String name, Meter meter) { + if (meter != null) { + String baseName = sanitize(name); + return "# HELP " + + baseName + + "_total Meter count\n# TYPE " + + baseName + + "_total counter\n" + + baseName + + "_total " + + meter.getCount() + + "\n\n# TYPE " + + baseName + + "_rate gauge\n" + + baseName + + "_m1_rate " + + meter.getOneMinuteRate() + + "\n" + + baseName + + "_m5_rate " + + meter.getFiveMinuteRate() + + "\n" + + baseName + + "_m15_rate " + + meter.getFifteenMinuteRate() + + "\n\n"; + } + return null; + } + + protected String formatTimer(String name, Timer timer) { + if (timer != null && timer.getSnapshot() != null) { + String baseName = sanitize(name); + Snapshot snap = timer.getSnapshot(); + long count = timer.getCount(); + return "# HELP " + + baseName + + "_duration_seconds Timer summary\n# TYPE " + + baseName + + "_duration_seconds summary\n" + + "\n" + + baseName + + "_duration_seconds" + + "{quantile=\"0.5\"} " + + nanosToSeconds(snap.getMedian()) + + "\n" + + baseName + + "_duration_seconds" + + "{quantile=\"0.75\"} " + + nanosToSeconds(snap.get75thPercentile()) + + "\n" + + baseName + + "_duration_seconds" + + "{quantile=\"0.95\"} " + + nanosToSeconds(snap.get95thPercentile()) + + "\n" + + baseName + + "_duration_seconds" + + "{quantile=\"0.98\"} " + + nanosToSeconds(snap.get98thPercentile()) + + "\n" + + baseName + + "_duration_seconds" + + "{quantile=\"0.99\"} " + + nanosToSeconds(snap.get99thPercentile()) + + "\n" + + baseName + + "_duration_seconds" + + "{quantile=\"0.999\"} " + + nanosToSeconds(snap.get999thPercentile()) + + "\n" + + baseName + + "_duration_seconds_count " + + count + + "\n" + + baseName + + "_duration_seconds_sum " + + nanosToSeconds(snap.getMean() * count) + + "\n\n# TYPE " + + baseName + + " gauge\n" + + baseName + + "_count " + + count + + "\n" + + baseName + + "_m1_rate " + + timer.getOneMinuteRate() + + "\n" + + baseName + + "_m5_rate " + + timer.getFiveMinuteRate() + + "\n" + + baseName + + "_m15_rate " + + timer.getFifteenMinuteRate() + + "\n\n"; + } + return null; + } + + protected double nanosToSeconds(double nanos) { + return nanos / 1_000_000_000.0; + } + + protected String sanitize(String name) { + if (enableSanitizePrometheusMetricsName) { + return name.replaceAll("[^a-zA-Z0-9_]", "_").toLowerCase(); + } + return name; + } + + protected String nanosMetricsNameToSeconds(String name) { + return name.replaceAll("_nanos", "_seconds"); + } } diff --git a/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandlerTest.java b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandlerTest.java new file mode 100644 index 00000000..7efdc23f --- /dev/null +++ b/spark-operator/src/test/java/org/apache/spark/k8s/operator/metrics/PrometheusPullModelHandlerTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator.metrics; + +import static org.junit.jupiter.api.Assertions.*; + +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.Objects; +import java.util.Properties; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Timer; +import org.junit.jupiter.api.Test; + +class PrometheusPullModelHandlerTest { + + @Test + void testFormatMetricsSnapshotIncludesGauge() throws Exception { + MetricRegistry registry = new MetricRegistry(); + registry.register("foo_gauge", (Gauge) () -> 42); + + PrometheusPullModelHandler handler = new PrometheusPullModelHandler(new Properties(), registry); + + String output = handler.formatMetricsSnapshot(); + assertTrue(output.contains("# TYPE foo_gauge gauge")); + assertTrue(output.contains("foo_gauge 42")); + } + + @Test + void testFormatMetricsSnapshotIncludesCounter() throws Exception { + MetricRegistry registry = new MetricRegistry(); + Counter counter = registry.counter("foo_counter"); + counter.inc(5); + + PrometheusPullModelHandler handler = new PrometheusPullModelHandler(new Properties(), registry); + + String output = handler.formatMetricsSnapshot(); + assertTrue(output.contains("# TYPE foo_counter_total counter")); + assertTrue(output.contains("foo_counter_total 5")); + } + + @Test + void testFormatMetricsSnapshotIncludesHistogram() throws Exception { + MetricRegistry registry = new MetricRegistry(); + Histogram histogram = registry.histogram("foo_histogram"); + histogram.update(100); + histogram.update(200); + + PrometheusPullModelHandler handler = new PrometheusPullModelHandler(new Properties(), registry); + + String output = handler.formatMetricsSnapshot(); + + assertTrue(output.contains("# TYPE foo_histogram summary")); + assertTrue(output.contains("foo_histogram_count 2")); + assertTrue(output.contains("foo_histogram_sum")); + } + + @Test + void testFormatMetricsSnapshotIncludesHistogramWithNanos() throws Exception { + MetricRegistry registry = new MetricRegistry(); + Histogram histogram = registry.histogram("foo_nanos_histogram"); + histogram.update(563682); + histogram.update(716252); + histogram.update(292098); + + PrometheusPullModelHandler handler = new PrometheusPullModelHandler(new Properties(), registry); + + String output = handler.formatMetricsSnapshot(); + + assertTrue(output.contains("# TYPE foo_seconds_histogram summary")); + assertTrue(output.contains("foo_seconds_histogram_count 3")); + assertTrue(output.contains("foo_seconds_histogram_sum 0.001572032")); + } + + @Test + void testFormatMetricsSnapshotIncludesMeter() throws Exception { + MetricRegistry registry = new MetricRegistry(); + Meter meter = registry.meter("foo_meter"); + meter.mark(3); + + PrometheusPullModelHandler handler = new PrometheusPullModelHandler(new Properties(), registry); + + String output = handler.formatMetricsSnapshot(); + assertTrue(output.contains("# TYPE foo_meter_total counter")); + assertTrue(output.contains("foo_meter_total 3")); + assertTrue(output.contains("foo_meter_m1_rate")); + } + + @Test + void testFormatMetricsSnapshotIncludesTimer() throws Exception { + MetricRegistry registry = new MetricRegistry(); + Timer timer = registry.timer("foo_timer"); + + timer.update(Duration.of(500, ChronoUnit.MILLIS)); + timer.update(Duration.of(1000, ChronoUnit.MILLIS)); + PrometheusPullModelHandler handler = new PrometheusPullModelHandler(new Properties(), registry); + + String output = handler.formatMetricsSnapshot(); + assertTrue(output.contains("# TYPE foo_timer_duration_seconds summary")); + assertTrue(output.contains("foo_timer_duration_seconds_count 2")); + assertTrue(output.contains("foo_timer_duration_seconds_sum 1.5")); + assertTrue(output.contains("foo_timer_m1_rate")); + } + + @Test + void testFormatMetricsSnapshotIncludesEmpty() throws Exception { + MetricRegistry registry = new MetricRegistry(); + registry.register("foo_gauge", (Gauge) () -> null); + PrometheusPullModelHandler handler = new PrometheusPullModelHandler(new Properties(), registry); + + String output = handler.formatMetricsSnapshot(); + assertFalse(output.contains("null")); + } +}