From ac29886cc6e8629c0b7b83be95181b58f5b052a2 Mon Sep 17 00:00:00 2001 From: Raphael Silva Date: Mon, 11 Dec 2023 01:51:14 +0000 Subject: [PATCH 01/11] Fix: Don't return error for metric translation in the prw exporter Don't return error to the exporter helper in case of failures to translate from OpenTelemetry metrics to Prometheus metrics (which can happen due to several reasons). Instead log the error in warn level and try to send as much data as possible. Signed-off-by: Raphael Silva --- .chloggen/prw_failure_translate.yaml | 27 +++++++++++++++++++ .../prometheusremotewriteexporter/exporter.go | 7 +++-- .../exporter_test.go | 17 ++++++++---- 3 files changed, 44 insertions(+), 7 deletions(-) create mode 100755 .chloggen/prw_failure_translate.yaml diff --git a/.chloggen/prw_failure_translate.yaml b/.chloggen/prw_failure_translate.yaml new file mode 100755 index 0000000000000..6e654aa062c09 --- /dev/null +++ b/.chloggen/prw_failure_translate.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusremotewriteexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Don't drop all metrics in a batch in case some of them cannot be translated from Otel to Prometheus. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [29729] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 362efaf5f0445..4e336dc844dcc 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -26,6 +26,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/multierr" + "go.uber.org/zap" prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite" @@ -134,15 +135,17 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings) if err != nil { - err = consumererror.NewPermanent(err) + prwe.settings.Logger.Warn("Failed to translate metrics: %s", zap.Error(err)) + prwe.settings.Logger.Warn("Exporting remaining %s metrics.", zap.Int("converted", len(tsMap))) } var m []*prompb.MetricMetadata if prwe.exporterSettings.SendMetadata { m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes) } + // Call export even if a conversion error, since there may be points that were successfully converted. - return multierr.Combine(err, prwe.handleExport(ctx, tsMap, m)) + return prwe.handleExport(ctx, tsMap, m) } } diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 9bcfcac1a2a0d..44ab20f74d1af 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -419,6 +419,11 @@ func Test_PushMetrics(t *testing.T) { emptySummaryBatch := getMetricsFromMetricList(invalidMetrics[emptySummary]) + // partial success (or partial failure) cases + + partialSuccess1 := getMetricsFromMetricList(validMetrics1[validSum], validMetrics2[validSum], + validMetrics1[validIntGauge], validMetrics2[validIntGauge], invalidMetrics[emptyGauge]) + // staleNaN cases staleNaNHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNHistogram]) staleNaNEmptyHistogramBatch := getMetricsFromMetricList(staleNaNMetrics[staleNaNEmptyHistogram]) @@ -469,7 +474,6 @@ func Test_PushMetrics(t *testing.T) { name: "invalid_type_case", metrics: invalidTypeBatch, httpResponseCode: http.StatusAccepted, - returnErr: true, }, { name: "intSum_case", @@ -570,28 +574,31 @@ func Test_PushMetrics(t *testing.T) { metrics: emptyDoubleGaugeBatch, reqTestFunc: checkFunc, httpResponseCode: http.StatusAccepted, - returnErr: true, }, { name: "emptyCumulativeSum_case", metrics: emptyCumulativeSumBatch, reqTestFunc: checkFunc, httpResponseCode: http.StatusAccepted, - returnErr: true, }, { name: "emptyCumulativeHistogram_case", metrics: emptyCumulativeHistogramBatch, reqTestFunc: checkFunc, httpResponseCode: http.StatusAccepted, - returnErr: true, }, { name: "emptySummary_case", metrics: emptySummaryBatch, reqTestFunc: checkFunc, httpResponseCode: http.StatusAccepted, - returnErr: true, + }, + { + name: "partialSuccess_case", + metrics: partialSuccess1, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + expectedTimeSeries: 4, }, { name: "staleNaNIntGauge_case", From baa04c623ecc896412185fd3bcce4f8c454cdc92 Mon Sep 17 00:00:00 2001 From: Raphael Silva Date: Fri, 12 Jan 2024 17:04:30 +0000 Subject: [PATCH 02/11] Feat: Add telemetry for metric translation in the PRWE Signed-off-by: Raphael Silva --- .chloggen/prw_failure_translate.yaml | 4 +- .../prometheusremotewriteexporter/exporter.go | 53 ++++++++- .../exporter_test.go | 108 ++++++++++++------ .../prometheusremotewriteexporter/factory.go | 3 +- 4 files changed, 124 insertions(+), 44 deletions(-) diff --git a/.chloggen/prw_failure_translate.yaml b/.chloggen/prw_failure_translate.yaml index 6e654aa062c09..9188e8920b37e 100755 --- a/.chloggen/prw_failure_translate.yaml +++ b/.chloggen/prw_failure_translate.yaml @@ -1,13 +1,13 @@ # Use this changelog template to create an entry for release notes. # One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: bug_fix +change_type: enhancement # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) component: prometheusremotewriteexporter # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Don't drop all metrics in a batch in case some of them cannot be translated from Otel to Prometheus. +note: Publish telemetry about translation of metrics from Otel to Prometheus. Don't drop all data points if some fail translation. # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [29729] diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 9df8ec687f9fc..295c1af0b50ca 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -25,13 +25,33 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/otel/metric" "go.uber.org/multierr" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/prometheusremotewriteexporter/internal/metadata" prometheustranslator "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheusremotewrite" ) +type prwTelemetry interface { + recordTranslationFailure(ctx context.Context) + recordTranslatedTimeSeries(ctx context.Context, numTS int) +} + +type prwTelemetryOtel struct { + failedTranslations metric.Int64Counter + translatedTimeSeries metric.Int64Counter +} + +func (p *prwTelemetryOtel) recordTranslationFailure(ctx context.Context) { + p.failedTranslations.Add(ctx, 1) +} + +func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS int) { + p.translatedTimeSeries.Add(ctx, int64(numTS)) +} + // prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. type prwExporter struct { endpointURL *url.URL @@ -46,10 +66,33 @@ type prwExporter struct { retrySettings configretry.BackOffConfig wal *prweWAL exporterSettings prometheusremotewrite.Settings + telemetry prwTelemetry +} + +func newPRWTelemetry(set exporter.CreateSettings) prwTelemetry { + + meter := metadata.Meter(set.TelemetrySettings) + // TODO: create helper functions similar to the processor helper: BuildCustomMetricName + prefix := "exporter/" + metadata.Type + "/" + + failedTranslations, _ := meter.Int64Counter(prefix+"failed_translations", + metric.WithDescription("Number of translation operations that failed to translate metrics from OTEL to Prometheus"), + metric.WithUnit("1"), + ) + + translatedTimeSeries, _ := meter.Int64Counter(prefix+"translated_metrics", + metric.WithDescription("Number of Prometheus time series that were translated from OTEL metrics"), + metric.WithUnit("1"), + ) + + return &prwTelemetryOtel{ + failedTranslations: failedTranslations, + translatedTimeSeries: translatedTimeSeries, + } } // newPRWExporter initializes a new prwExporter instance and sets fields accordingly. -func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, error) { +func newPRWExporter(cfg *Config, set exporter.CreateSettings, telemetry prwTelemetry) (*prwExporter, error) { sanitizedLabels, err := validateAndSanitizeExternalLabels(cfg) if err != nil { return nil, err @@ -80,6 +123,7 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, err AddMetricSuffixes: cfg.AddMetricSuffixes, SendMetadata: cfg.SendMetadata, }, + telemetry: telemetry, } if cfg.WAL == nil { return prwe, nil @@ -135,10 +179,13 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings) if err != nil { - prwe.settings.Logger.Warn("Failed to translate metrics: %s", zap.Error(err)) - prwe.settings.Logger.Warn("Exporting remaining %s metrics.", zap.Int("converted", len(tsMap))) + prwe.telemetry.recordTranslationFailure(ctx) + prwe.settings.Logger.Debug("failed to translate metrics %s", zap.Error(err)) + prwe.settings.Logger.Debug("exporting remaining %s metrics", zap.Int("translated", len(tsMap))) } + prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap)) + var m []*prompb.MetricMetadata if prwe.exporterSettings.SendMetadata { m = prometheusremotewrite.OtelMetricsToMetadata(md, prwe.exporterSettings.AddMetricSuffixes) diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 76848c0f3ffa7..a95810af62e87 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -111,7 +111,7 @@ func Test_NewPRWExporter(t *testing.T) { cfg.ExternalLabels = tt.externalLabels cfg.Namespace = tt.namespace cfg.RemoteWriteQueue.NumConsumers = 1 - prwe, err := newPRWExporter(cfg, tt.set) + prwe, err := newPRWExporter(cfg, tt.set, newPRWTelemetry(set)) if tt.returnErrorOnCreate { assert.Error(t, err) @@ -200,7 +200,7 @@ func Test_Start(t *testing.T) { cfg.RemoteWriteQueue.NumConsumers = 1 cfg.HTTPClientSettings = tt.clientSettings - prwe, err := newPRWExporter(cfg, tt.set) + prwe, err := newPRWExporter(cfg, tt.set, newPRWTelemetry(tt.set)) assert.NoError(t, err) assert.NotNil(t, prwe) @@ -360,7 +360,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error { set := exportertest.NewNopCreateSettings() set.BuildInfo = buildInfo // after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint - prwe, err := newPRWExporter(cfg, set) + prwe, err := newPRWExporter(cfg, set, newPRWTelemetry(set)) if err != nil { return err } @@ -372,6 +372,19 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error { return prwe.handleExport(context.Background(), testmap, nil) } +type mockPRWTelemetry struct { + failedTranslations int + translatedTimeSeries int +} + +func (m *mockPRWTelemetry) recordTranslationFailure(_ context.Context) { + m.failedTranslations++ +} + +func (m *mockPRWTelemetry) recordTranslatedTimeSeries(_ context.Context, numTs int) { + m.translatedTimeSeries += numTs +} + // Test_PushMetrics checks the number of TimeSeries received by server and the number of metrics dropped is the same as // expected func Test_PushMetrics(t *testing.T) { @@ -462,19 +475,23 @@ func Test_PushMetrics(t *testing.T) { } tests := []struct { - name string - metrics pmetric.Metrics - reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool) - expectedTimeSeries int - httpResponseCode int - returnErr bool - isStaleMarker bool - skipForWAL bool + name string + metrics pmetric.Metrics + reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool) + expectedTimeSeries int + httpResponseCode int + returnErr bool + isStaleMarker bool + skipForWAL bool + hasFailedTranslation bool }{ { - name: "invalid_type_case", - metrics: invalidTypeBatch, - httpResponseCode: http.StatusAccepted, + name: "invalid_type_case", + metrics: invalidTypeBatch, + httpResponseCode: http.StatusAccepted, + reqTestFunc: checkFunc, + expectedTimeSeries: 1, // the resource target metric. + hasFailedTranslation: true, }, { name: "intSum_case", @@ -571,35 +588,40 @@ func Test_PushMetrics(t *testing.T) { skipForWAL: true, }, { - name: "emptyGauge_case", - metrics: emptyDoubleGaugeBatch, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, + name: "emptyGauge_case", + metrics: emptyDoubleGaugeBatch, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + hasFailedTranslation: true, }, { - name: "emptyCumulativeSum_case", - metrics: emptyCumulativeSumBatch, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, + name: "emptyCumulativeSum_case", + metrics: emptyCumulativeSumBatch, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + hasFailedTranslation: true, }, { - name: "emptyCumulativeHistogram_case", - metrics: emptyCumulativeHistogramBatch, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, + name: "emptyCumulativeHistogram_case", + metrics: emptyCumulativeHistogramBatch, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + hasFailedTranslation: true, }, { - name: "emptySummary_case", - metrics: emptySummaryBatch, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, + name: "emptySummary_case", + metrics: emptySummaryBatch, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + hasFailedTranslation: true, }, { - name: "partialSuccess_case", - metrics: partialSuccess1, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, - expectedTimeSeries: 4, + name: "partialSuccess_case", + metrics: partialSuccess1, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + expectedTimeSeries: 4, + hasFailedTranslation: true, }, { name: "staleNaNIntGauge_case", @@ -675,6 +697,7 @@ func Test_PushMetrics(t *testing.T) { } t.Run(tt.name, func(t *testing.T) { t.Parallel() + mockTelemetry := &mockPRWTelemetry{} server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if tt.reqTestFunc != nil { tt.reqTestFunc(t, r, tt.expectedTimeSeries, tt.isStaleMarker) @@ -723,7 +746,9 @@ func Test_PushMetrics(t *testing.T) { } set := exportertest.NewNopCreateSettings() set.BuildInfo = buildInfo - prwe, nErr := newPRWExporter(cfg, set) + + prwe, nErr := newPRWExporter(cfg, set, mockTelemetry) + require.NoError(t, nErr) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -736,6 +761,13 @@ func Test_PushMetrics(t *testing.T) { assert.Error(t, err) return } + + if tt.hasFailedTranslation { + assert.Equal(t, 1, mockTelemetry.failedTranslations) + } else { + assert.Equal(t, 0, mockTelemetry.failedTranslations) + } + assert.Equal(t, tt.expectedTimeSeries, mockTelemetry.translatedTimeSeries) assert.NoError(t, err) }) } @@ -901,7 +933,7 @@ func TestWALOnExporterRoundTrip(t *testing.T) { Version: "1.0", } - prwe, perr := newPRWExporter(cfg, set) + prwe, perr := newPRWExporter(cfg, set, newPRWTelemetry(set)) assert.NoError(t, perr) nopHost := componenttest.NewNopHost() @@ -979,7 +1011,7 @@ func TestWALOnExporterRoundTrip(t *testing.T) { // 4. Finally, ensure that the bytes that were uploaded to the // Prometheus Remote Write endpoint are exactly as were saved in the WAL. // Read from that same WAL, export to the RWExporter server. - prwe2, err := newPRWExporter(cfg, set) + prwe2, err := newPRWExporter(cfg, set, newPRWTelemetry(set)) assert.NoError(t, err) require.NoError(t, prwe2.Start(ctx, nopHost)) t.Cleanup(func() { diff --git a/exporter/prometheusremotewriteexporter/factory.go b/exporter/prometheusremotewriteexporter/factory.go index d7fe203fb356e..2d0b99a25f0ce 100644 --- a/exporter/prometheusremotewriteexporter/factory.go +++ b/exporter/prometheusremotewriteexporter/factory.go @@ -35,7 +35,8 @@ func createMetricsExporter(ctx context.Context, set exporter.CreateSettings, return nil, errors.New("invalid configuration") } - prwe, err := newPRWExporter(prwCfg, set) + telemetry := newPRWTelemetry(set) + prwe, err := newPRWExporter(prwCfg, set, telemetry) if err != nil { return nil, err } From b3a6ce61eacc15c470e58341b23e464f580ce127 Mon Sep 17 00:00:00 2001 From: Raphael Philipe Mendes da Silva Date: Fri, 12 Jan 2024 10:59:05 -0800 Subject: [PATCH 03/11] Fix and simplify log line in case of failure to translated Co-authored-by: Anthony Mirabella --- exporter/prometheusremotewriteexporter/exporter.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 295c1af0b50ca..0e869e4dd533d 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -180,8 +180,7 @@ func (prwe *prwExporter) PushMetrics(ctx context.Context, md pmetric.Metrics) er tsMap, err := prometheusremotewrite.FromMetrics(md, prwe.exporterSettings) if err != nil { prwe.telemetry.recordTranslationFailure(ctx) - prwe.settings.Logger.Debug("failed to translate metrics %s", zap.Error(err)) - prwe.settings.Logger.Debug("exporting remaining %s metrics", zap.Int("translated", len(tsMap))) + prwe.settings.Logger.Debug("failed to translate metrics, exporting remaining metrics", zap.Error(err), zap.Int("translated", len(tsMap))) } prwe.telemetry.recordTranslatedTimeSeries(ctx, len(tsMap)) From a128f16a605f85c49600ca3f2a7476a7011e0640 Mon Sep 17 00:00:00 2001 From: Raphael Philipe Mendes da Silva Date: Fri, 12 Jan 2024 10:59:30 -0800 Subject: [PATCH 04/11] Simplify tests Co-authored-by: bryan-aguilar <46550959+bryan-aguilar@users.noreply.github.com> --- exporter/prometheusremotewriteexporter/exporter_test.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index a95810af62e87..948041a13c0b9 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -483,7 +483,7 @@ func Test_PushMetrics(t *testing.T) { returnErr bool isStaleMarker bool skipForWAL bool - hasFailedTranslation bool + expectedFailedTranslations int }{ { name: "invalid_type_case", @@ -762,11 +762,7 @@ func Test_PushMetrics(t *testing.T) { return } - if tt.hasFailedTranslation { - assert.Equal(t, 1, mockTelemetry.failedTranslations) - } else { - assert.Equal(t, 0, mockTelemetry.failedTranslations) - } + assert.Equal(t, tt.ExpectedFailedTranslations, mockTelemetry.failedTranslations) assert.Equal(t, tt.expectedTimeSeries, mockTelemetry.translatedTimeSeries) assert.NoError(t, err) }) From 84af853b994de258d90ffe1193889d310141da91 Mon Sep 17 00:00:00 2001 From: Raphael Silva Date: Fri, 12 Jan 2024 19:27:45 +0000 Subject: [PATCH 05/11] Fix: fix based on code review Signed-off-by: Raphael Silva --- .../prometheusremotewriteexporter/exporter.go | 30 ++++-- .../exporter_test.go | 95 ++++++++++--------- .../prometheusremotewriteexporter/factory.go | 3 +- 3 files changed, 69 insertions(+), 59 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 0e869e4dd533d..4bdca75cc5950 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.uber.org/multierr" "go.uber.org/zap" @@ -42,14 +43,15 @@ type prwTelemetry interface { type prwTelemetryOtel struct { failedTranslations metric.Int64Counter translatedTimeSeries metric.Int64Counter + otelAttrs []attribute.KeyValue } func (p *prwTelemetryOtel) recordTranslationFailure(ctx context.Context) { - p.failedTranslations.Add(ctx, 1) + p.failedTranslations.Add(ctx, 1, metric.WithAttributes(p.otelAttrs...)) } func (p *prwTelemetryOtel) recordTranslatedTimeSeries(ctx context.Context, numTS int) { - p.translatedTimeSeries.Add(ctx, int64(numTS)) + p.translatedTimeSeries.Add(ctx, int64(numTS), metric.WithAttributes(p.otelAttrs...)) } // prwExporter converts OTLP metrics to Prometheus remote write TimeSeries and sends them to a remote endpoint. @@ -69,30 +71,33 @@ type prwExporter struct { telemetry prwTelemetry } -func newPRWTelemetry(set exporter.CreateSettings) prwTelemetry { +func newPRWTelemetry(set exporter.CreateSettings) (prwTelemetry, error) { meter := metadata.Meter(set.TelemetrySettings) // TODO: create helper functions similar to the processor helper: BuildCustomMetricName prefix := "exporter/" + metadata.Type + "/" - failedTranslations, _ := meter.Int64Counter(prefix+"failed_translations", - metric.WithDescription("Number of translation operations that failed to translate metrics from OTEL to Prometheus"), + failedTranslations, errFailedTranslation := meter.Int64Counter(prefix+"failed_translations", + metric.WithDescription("Number of translation operations that failed to translate metrics from Otel to Prometheus"), metric.WithUnit("1"), ) - translatedTimeSeries, _ := meter.Int64Counter(prefix+"translated_metrics", - metric.WithDescription("Number of Prometheus time series that were translated from OTEL metrics"), + translatedTimeSeries, errTranslatedMetrics := meter.Int64Counter(prefix+"translated_metrics", + metric.WithDescription("Number of Prometheus time series that were translated from OTel metrics"), metric.WithUnit("1"), ) return &prwTelemetryOtel{ failedTranslations: failedTranslations, translatedTimeSeries: translatedTimeSeries, - } + otelAttrs: []attribute.KeyValue{ + attribute.String("exporter", set.ID.String()), + }, + }, errors.Join(errFailedTranslation, errTranslatedMetrics) } // newPRWExporter initializes a new prwExporter instance and sets fields accordingly. -func newPRWExporter(cfg *Config, set exporter.CreateSettings, telemetry prwTelemetry) (*prwExporter, error) { +func newPRWExporter(cfg *Config, set exporter.CreateSettings) (*prwExporter, error) { sanitizedLabels, err := validateAndSanitizeExternalLabels(cfg) if err != nil { return nil, err @@ -103,6 +108,11 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings, telemetry prwTelem return nil, errors.New("invalid endpoint") } + prwTelemetry, err := newPRWTelemetry(set) + if err != nil { + return nil, err + } + userAgentHeader := fmt.Sprintf("%s/%s", strings.ReplaceAll(strings.ToLower(set.BuildInfo.Description), " ", "-"), set.BuildInfo.Version) prwe := &prwExporter{ @@ -123,7 +133,7 @@ func newPRWExporter(cfg *Config, set exporter.CreateSettings, telemetry prwTelem AddMetricSuffixes: cfg.AddMetricSuffixes, SendMetadata: cfg.SendMetadata, }, - telemetry: telemetry, + telemetry: prwTelemetry, } if cfg.WAL == nil { return prwe, nil diff --git a/exporter/prometheusremotewriteexporter/exporter_test.go b/exporter/prometheusremotewriteexporter/exporter_test.go index 948041a13c0b9..9e033ce8ddb3a 100644 --- a/exporter/prometheusremotewriteexporter/exporter_test.go +++ b/exporter/prometheusremotewriteexporter/exporter_test.go @@ -111,7 +111,7 @@ func Test_NewPRWExporter(t *testing.T) { cfg.ExternalLabels = tt.externalLabels cfg.Namespace = tt.namespace cfg.RemoteWriteQueue.NumConsumers = 1 - prwe, err := newPRWExporter(cfg, tt.set, newPRWTelemetry(set)) + prwe, err := newPRWExporter(cfg, tt.set) if tt.returnErrorOnCreate { assert.Error(t, err) @@ -200,7 +200,7 @@ func Test_Start(t *testing.T) { cfg.RemoteWriteQueue.NumConsumers = 1 cfg.HTTPClientSettings = tt.clientSettings - prwe, err := newPRWExporter(cfg, tt.set, newPRWTelemetry(tt.set)) + prwe, err := newPRWExporter(cfg, tt.set) assert.NoError(t, err) assert.NotNil(t, prwe) @@ -360,7 +360,7 @@ func runExportPipeline(ts *prompb.TimeSeries, endpoint *url.URL) error { set := exportertest.NewNopCreateSettings() set.BuildInfo = buildInfo // after this, instantiate a CortexExporter with the current HTTP client and endpoint set to passed in endpoint - prwe, err := newPRWExporter(cfg, set, newPRWTelemetry(set)) + prwe, err := newPRWExporter(cfg, set) if err != nil { return err } @@ -475,23 +475,23 @@ func Test_PushMetrics(t *testing.T) { } tests := []struct { - name string - metrics pmetric.Metrics - reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool) - expectedTimeSeries int - httpResponseCode int - returnErr bool - isStaleMarker bool - skipForWAL bool + name string + metrics pmetric.Metrics + reqTestFunc func(t *testing.T, r *http.Request, expected int, isStaleMarker bool) + expectedTimeSeries int + httpResponseCode int + returnErr bool + isStaleMarker bool + skipForWAL bool expectedFailedTranslations int }{ { - name: "invalid_type_case", - metrics: invalidTypeBatch, - httpResponseCode: http.StatusAccepted, - reqTestFunc: checkFunc, - expectedTimeSeries: 1, // the resource target metric. - hasFailedTranslation: true, + name: "invalid_type_case", + metrics: invalidTypeBatch, + httpResponseCode: http.StatusAccepted, + reqTestFunc: checkFunc, + expectedTimeSeries: 1, // the resource target metric. + expectedFailedTranslations: 1, }, { name: "intSum_case", @@ -588,40 +588,40 @@ func Test_PushMetrics(t *testing.T) { skipForWAL: true, }, { - name: "emptyGauge_case", - metrics: emptyDoubleGaugeBatch, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, - hasFailedTranslation: true, + name: "emptyGauge_case", + metrics: emptyDoubleGaugeBatch, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + expectedFailedTranslations: 1, }, { - name: "emptyCumulativeSum_case", - metrics: emptyCumulativeSumBatch, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, - hasFailedTranslation: true, + name: "emptyCumulativeSum_case", + metrics: emptyCumulativeSumBatch, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + expectedFailedTranslations: 1, }, { - name: "emptyCumulativeHistogram_case", - metrics: emptyCumulativeHistogramBatch, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, - hasFailedTranslation: true, + name: "emptyCumulativeHistogram_case", + metrics: emptyCumulativeHistogramBatch, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + expectedFailedTranslations: 1, }, { - name: "emptySummary_case", - metrics: emptySummaryBatch, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, - hasFailedTranslation: true, + name: "emptySummary_case", + metrics: emptySummaryBatch, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + expectedFailedTranslations: 1, }, { - name: "partialSuccess_case", - metrics: partialSuccess1, - reqTestFunc: checkFunc, - httpResponseCode: http.StatusAccepted, - expectedTimeSeries: 4, - hasFailedTranslation: true, + name: "partialSuccess_case", + metrics: partialSuccess1, + reqTestFunc: checkFunc, + httpResponseCode: http.StatusAccepted, + expectedTimeSeries: 4, + expectedFailedTranslations: 1, }, { name: "staleNaNIntGauge_case", @@ -747,7 +747,8 @@ func Test_PushMetrics(t *testing.T) { set := exportertest.NewNopCreateSettings() set.BuildInfo = buildInfo - prwe, nErr := newPRWExporter(cfg, set, mockTelemetry) + prwe, nErr := newPRWExporter(cfg, set) + prwe.telemetry = mockTelemetry require.NoError(t, nErr) ctx, cancel := context.WithCancel(context.Background()) @@ -762,7 +763,7 @@ func Test_PushMetrics(t *testing.T) { return } - assert.Equal(t, tt.ExpectedFailedTranslations, mockTelemetry.failedTranslations) + assert.Equal(t, tt.expectedFailedTranslations, mockTelemetry.failedTranslations) assert.Equal(t, tt.expectedTimeSeries, mockTelemetry.translatedTimeSeries) assert.NoError(t, err) }) @@ -929,7 +930,7 @@ func TestWALOnExporterRoundTrip(t *testing.T) { Version: "1.0", } - prwe, perr := newPRWExporter(cfg, set, newPRWTelemetry(set)) + prwe, perr := newPRWExporter(cfg, set) assert.NoError(t, perr) nopHost := componenttest.NewNopHost() @@ -1007,7 +1008,7 @@ func TestWALOnExporterRoundTrip(t *testing.T) { // 4. Finally, ensure that the bytes that were uploaded to the // Prometheus Remote Write endpoint are exactly as were saved in the WAL. // Read from that same WAL, export to the RWExporter server. - prwe2, err := newPRWExporter(cfg, set, newPRWTelemetry(set)) + prwe2, err := newPRWExporter(cfg, set) assert.NoError(t, err) require.NoError(t, prwe2.Start(ctx, nopHost)) t.Cleanup(func() { diff --git a/exporter/prometheusremotewriteexporter/factory.go b/exporter/prometheusremotewriteexporter/factory.go index 2d0b99a25f0ce..d7fe203fb356e 100644 --- a/exporter/prometheusremotewriteexporter/factory.go +++ b/exporter/prometheusremotewriteexporter/factory.go @@ -35,8 +35,7 @@ func createMetricsExporter(ctx context.Context, set exporter.CreateSettings, return nil, errors.New("invalid configuration") } - telemetry := newPRWTelemetry(set) - prwe, err := newPRWExporter(prwCfg, set, telemetry) + prwe, err := newPRWExporter(prwCfg, set) if err != nil { return nil, err } From bffdd8c4f8d6989da9c7844d91440f59cd78c7a3 Mon Sep 17 00:00:00 2001 From: Raphael Silva Date: Fri, 12 Jan 2024 19:36:00 +0000 Subject: [PATCH 06/11] Chore: Update metric name Signed-off-by: Raphael Silva --- exporter/prometheusremotewriteexporter/exporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 4bdca75cc5950..f022e9eedbdd4 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -82,7 +82,7 @@ func newPRWTelemetry(set exporter.CreateSettings) (prwTelemetry, error) { metric.WithUnit("1"), ) - translatedTimeSeries, errTranslatedMetrics := meter.Int64Counter(prefix+"translated_metrics", + translatedTimeSeries, errTranslatedMetrics := meter.Int64Counter(prefix+"translated_time_series", metric.WithDescription("Number of Prometheus time series that were translated from OTel metrics"), metric.WithUnit("1"), ) From faf0fddea0e56e021f8efc34c65b82eb0ec2037a Mon Sep 17 00:00:00 2001 From: Raphael Silva Date: Fri, 12 Jan 2024 21:46:55 +0000 Subject: [PATCH 07/11] Fix: Fix go.mod Signed-off-by: Raphael Silva --- exporter/prometheusremotewriteexporter/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/prometheusremotewriteexporter/go.mod b/exporter/prometheusremotewriteexporter/go.mod index 95b562b2ad976..741a20bff9f72 100644 --- a/exporter/prometheusremotewriteexporter/go.mod +++ b/exporter/prometheusremotewriteexporter/go.mod @@ -23,6 +23,7 @@ require ( go.opentelemetry.io/collector/consumer v0.92.1-0.20240110091511-bf804d6c4ecc go.opentelemetry.io/collector/exporter v0.92.1-0.20240110091511-bf804d6c4ecc go.opentelemetry.io/collector/pdata v1.0.2-0.20240110091511-bf804d6c4ecc + go.opentelemetry.io/otel v1.21.0 go.opentelemetry.io/otel/metric v1.21.0 go.opentelemetry.io/otel/trace v1.21.0 go.uber.org/multierr v1.11.0 @@ -65,7 +66,6 @@ require ( go.opentelemetry.io/collector/receiver v0.92.1-0.20240110091511-bf804d6c4ecc // indirect go.opentelemetry.io/collector/semconv v0.92.1-0.20240110091511-bf804d6c4ecc // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect - go.opentelemetry.io/otel v1.21.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/sys v0.16.0 // indirect golang.org/x/text v0.14.0 // indirect From f60d68e3c84051e400af965b58283d858ef68aa5 Mon Sep 17 00:00:00 2001 From: Raphael Silva Date: Fri, 12 Jan 2024 22:13:48 +0000 Subject: [PATCH 08/11] Chore: fix go.mod Signed-off-by: Raphael Silva --- exporter/prometheusremotewriteexporter/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/prometheusremotewriteexporter/go.mod b/exporter/prometheusremotewriteexporter/go.mod index 9789d4d996b2a..b734b14f0706c 100644 --- a/exporter/prometheusremotewriteexporter/go.mod +++ b/exporter/prometheusremotewriteexporter/go.mod @@ -23,6 +23,7 @@ require ( go.opentelemetry.io/collector/consumer v0.92.1-0.20240112172857-83d463ceba06 go.opentelemetry.io/collector/exporter v0.92.1-0.20240112172857-83d463ceba06 go.opentelemetry.io/collector/pdata v1.0.2-0.20240112172857-83d463ceba06 + go.opentelemetry.io/otel v1.21.0 go.opentelemetry.io/otel/metric v1.21.0 go.opentelemetry.io/otel/trace v1.21.0 go.uber.org/multierr v1.11.0 @@ -76,7 +77,6 @@ require ( go.opentelemetry.io/collector/receiver v0.92.1-0.20240112172857-83d463ceba06 // indirect go.opentelemetry.io/collector/semconv v0.92.1-0.20240112172857-83d463ceba06 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect - go.opentelemetry.io/otel v1.21.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.44.1-0.20231201153405-6027c1ae76f2 // indirect go.opentelemetry.io/otel/sdk v1.21.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect From 0fad48b0f298a8cea110bf83807e3caf723d6fa9 Mon Sep 17 00:00:00 2001 From: Raphael Silva Date: Tue, 30 Jan 2024 17:10:30 +0000 Subject: [PATCH 09/11] Fix go.mod Signed-off-by: Raphael Silva --- exporter/prometheusremotewriteexporter/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/prometheusremotewriteexporter/go.mod b/exporter/prometheusremotewriteexporter/go.mod index fe81ac6322590..370b1a9ef71b3 100644 --- a/exporter/prometheusremotewriteexporter/go.mod +++ b/exporter/prometheusremotewriteexporter/go.mod @@ -23,6 +23,7 @@ require ( go.opentelemetry.io/collector/consumer v0.93.1-0.20240129215828-1ed45ec12569 go.opentelemetry.io/collector/exporter v0.93.1-0.20240129215828-1ed45ec12569 go.opentelemetry.io/collector/pdata v1.0.2-0.20240129215828-1ed45ec12569 + go.opentelemetry.io/otel v1.22.0 go.opentelemetry.io/otel/metric v1.22.0 go.opentelemetry.io/otel/trace v1.22.0 go.uber.org/multierr v1.11.0 @@ -75,7 +76,6 @@ require ( go.opentelemetry.io/collector/receiver v0.93.1-0.20240129215828-1ed45ec12569 // indirect go.opentelemetry.io/collector/semconv v0.93.1-0.20240129215828-1ed45ec12569 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect - go.opentelemetry.io/otel v1.22.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.45.0 // indirect go.opentelemetry.io/otel/sdk v1.22.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.22.0 // indirect From 2bc92ca932629ecb6da9076cb335180ac5e06b7c Mon Sep 17 00:00:00 2001 From: Raphael Silva Date: Fri, 1 Mar 2024 01:06:51 +0000 Subject: [PATCH 10/11] Fix merge conflicts --- exporter/prometheusremotewriteexporter/exporter.go | 3 +-- exporter/prometheusremotewriteexporter/go.mod | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/exporter/prometheusremotewriteexporter/exporter.go b/exporter/prometheusremotewriteexporter/exporter.go index 4408bbd8ebedb..51825e709c0ab 100644 --- a/exporter/prometheusremotewriteexporter/exporter.go +++ b/exporter/prometheusremotewriteexporter/exporter.go @@ -75,8 +75,7 @@ func newPRWTelemetry(set exporter.CreateSettings) (prwTelemetry, error) { meter := metadata.Meter(set.TelemetrySettings) // TODO: create helper functions similar to the processor helper: BuildCustomMetricName - prefix := "exporter/" + metadata.Type + "/" - + prefix := "exporter/" + metadata.Type.String() + "/" failedTranslations, errFailedTranslation := meter.Int64Counter(prefix+"failed_translations", metric.WithDescription("Number of translation operations that failed to translate metrics from Otel to Prometheus"), metric.WithUnit("1"), diff --git a/exporter/prometheusremotewriteexporter/go.mod b/exporter/prometheusremotewriteexporter/go.mod index a084f3314b6a7..af2f38f04a663 100644 --- a/exporter/prometheusremotewriteexporter/go.mod +++ b/exporter/prometheusremotewriteexporter/go.mod @@ -23,6 +23,7 @@ require ( go.opentelemetry.io/collector/consumer v0.95.0 go.opentelemetry.io/collector/exporter v0.95.0 go.opentelemetry.io/collector/pdata v1.2.0 + go.opentelemetry.io/otel v1.23.1 go.opentelemetry.io/otel/metric v1.23.1 go.opentelemetry.io/otel/trace v1.23.1 go.uber.org/multierr v1.11.0 @@ -70,7 +71,6 @@ require ( go.opentelemetry.io/collector/receiver v0.95.0 // indirect go.opentelemetry.io/collector/semconv v0.95.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.47.0 // indirect - go.opentelemetry.io/otel v1.23.1 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.45.2 // indirect go.opentelemetry.io/otel/sdk v1.23.1 // indirect go.opentelemetry.io/otel/sdk/metric v1.23.1 // indirect From e8ea8462a2f9ead9762a4df2f842e6bcd66467c3 Mon Sep 17 00:00:00 2001 From: Bryan Aguilar Date: Wed, 13 Mar 2024 09:45:39 -0700 Subject: [PATCH 11/11] make tidy --- exporter/prometheusremotewriteexporter/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/prometheusremotewriteexporter/go.mod b/exporter/prometheusremotewriteexporter/go.mod index dab7359e2b611..c4ebe696213ed 100644 --- a/exporter/prometheusremotewriteexporter/go.mod +++ b/exporter/prometheusremotewriteexporter/go.mod @@ -23,6 +23,7 @@ require ( go.opentelemetry.io/collector/consumer v0.96.1-0.20240306115632-b2693620eff6 go.opentelemetry.io/collector/exporter v0.96.1-0.20240306115632-b2693620eff6 go.opentelemetry.io/collector/pdata v1.3.1-0.20240306115632-b2693620eff6 + go.opentelemetry.io/otel v1.24.0 go.opentelemetry.io/otel/metric v1.24.0 go.opentelemetry.io/otel/trace v1.24.0 go.uber.org/goleak v1.3.0 @@ -70,7 +71,6 @@ require ( go.opentelemetry.io/collector/receiver v0.96.1-0.20240306115632-b2693620eff6 // indirect go.opentelemetry.io/collector/semconv v0.96.1-0.20240306115632-b2693620eff6 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect - go.opentelemetry.io/otel v1.24.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.46.0 // indirect go.opentelemetry.io/otel/sdk v1.24.0 // indirect go.opentelemetry.io/otel/sdk/metric v1.24.0 // indirect