From 3af987a525baefd055310b4e49f920dd5d47c8db Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Tue, 19 Jul 2022 11:25:28 -0700 Subject: [PATCH 1/2] [exporter/signalfx] Support removal of direction attribute Host metrics have split the direction attribute into multiple metrics. The signalfx exporter has translation functionality that assumed metrics were of the previous format, so this change supports the new format, and drops support of the old. This change also adds an "add_dimensions" translate ability, tests this new functionality, and modifies tests to ensure output metric format matches original. --- exporter/signalfxexporter/README.md | 1 + exporter/signalfxexporter/factory_test.go | 190 +++++---- .../internal/translation/constants.go | 131 ++++-- .../internal/translation/translator.go | 46 ++- .../internal/translation/translator_test.go | 376 ++++++++++++++++++ .../signalfx_exporter_remove_direction.yaml | 16 + 6 files changed, 644 insertions(+), 116 deletions(-) create mode 100755 unreleased/signalfx_exporter_remove_direction.yaml diff --git a/exporter/signalfxexporter/README.md b/exporter/signalfxexporter/README.md index d742d93716d94..d6de49daf053b 100644 --- a/exporter/signalfxexporter/README.md +++ b/exporter/signalfxexporter/README.md @@ -156,6 +156,7 @@ The `translation_rules` metrics configuration field accepts a list of metric-tra help ensure compatibility with custom charts and dashboards when using the OpenTelemetry Collector. It also provides the ability to produce custom metrics by copying, calculating new, or aggregating other metric values without requiring an additional processor. The rule language is expressed in yaml mappings and is [documented here](./internal/translation/translator.go). Translation rules currently allow the following actions: +* `add_dimensions` - Adds dimensions for specified metrics, or globally * `aggregate_metric` - Aggregates a metric through removal of specified dimensions * `calculate_new_metric` - Creates a new metric via operating on two consistuent ones * `convert_values` - Convert float values to int or int to float for specified metric names diff --git a/exporter/signalfxexporter/factory_test.go b/exporter/signalfxexporter/factory_test.go index ddf74f4cf735e..28ee70c002a1e 100644 --- a/exporter/signalfxexporter/factory_test.go +++ b/exporter/signalfxexporter/factory_test.go @@ -188,7 +188,7 @@ func TestDefaultTranslationRules(t *testing.T) { rules, err := loadDefaultTranslationRules() require.NoError(t, err) require.NotNil(t, rules, "rules are nil") - tr, err := translation.NewMetricTranslator(rules, 1) + tr, err := translation.NewMetricTranslator(rules, 600) require.NoError(t, err) data := testMetricsData() @@ -210,17 +210,17 @@ func TestDefaultTranslationRules(t *testing.T) { // system.network.operations.total new metric calculation dps, ok = metrics["system.disk.operations.total"] - require.True(t, ok, "system.network.operations.total metrics not found") + require.True(t, ok, "system.disk.operations.total metrics not found") require.Len(t, dps, 4) require.Equal(t, 2, len(dps[0].Dimensions)) // system.network.io.total new metric calculation dps, ok = metrics["system.disk.io.total"] - require.True(t, ok, "system.network.io.total metrics not found") + require.True(t, ok, "system.disk.io.total metrics not found") require.Len(t, dps, 2) require.Equal(t, 2, len(dps[0].Dimensions)) for _, dp := range dps { - require.Equal(t, "direction", dp.Dimensions[0].Key) + require.Equal(t, "direction", dp.Dimensions[1].Key) switch dp.Dimensions[1].Value { case "write": require.Equal(t, int64(11e9), *dp.Value.IntValue) @@ -231,13 +231,20 @@ func TestDefaultTranslationRules(t *testing.T) { // disk_ops.total gauge from system.disk.operations cumulative, where is disk_ops.total // is the cumulative across devices and directions. + // Test uses 4 metrics, each with more than 1 data point. Aggregate by sum reduces each metric to 1 data point. + // 4 metrics, each with one data point, create 3 delta data points, either how much the current data point is + // larger than previous, or current data point's value (if smaller than previous). dps, ok = metrics["disk_ops.total"] require.True(t, ok, "disk_ops.total metrics not found") - require.Len(t, dps, 1) - require.Equal(t, int64(8e3), *dps[0].Value.IntValue) - require.Equal(t, 1, len(dps[0].Dimensions)) - require.Equal(t, "host", dps[0].Dimensions[0].Key) - require.Equal(t, "host0", dps[0].Dimensions[0].Value) + require.Len(t, dps, 3) + require.Equal(t, int64(4e3), *dps[0].Value.IntValue) + require.Equal(t, int64(4e3), *dps[1].Value.IntValue) + require.Equal(t, int64(10e3), *dps[2].Value.IntValue) + for _, dp := range dps { + require.Equal(t, 1, len(dp.Dimensions)) + require.Equal(t, "host", dp.Dimensions[0].Key) + require.Equal(t, "host0", dp.Dimensions[0].Value) + } // system.network.io.total new metric calculation dps, ok = metrics["system.network.io.total"] @@ -251,15 +258,19 @@ func TestDefaultTranslationRules(t *testing.T) { require.Len(t, dps, 1) require.Equal(t, 4, len(dps[0].Dimensions)) require.Equal(t, int64(350), *dps[0].Value.IntValue) - require.Equal(t, "direction", dps[0].Dimensions[0].Key) - require.Equal(t, "receive", dps[0].Dimensions[0].Value) + require.Equal(t, "direction", dps[0].Dimensions[3].Key) + require.Equal(t, "receive", dps[0].Dimensions[3].Value) // network.total new metric calculation dps, ok = metrics["network.total"] require.True(t, ok, "network.total metrics not found") - require.Len(t, dps, 1) + // Expect two data points because two metrics are sent in. Aggregation is within a single metric, so we can't + // combine metrics. + require.Len(t, dps, 2) require.Equal(t, 3, len(dps[0].Dimensions)) - require.Equal(t, int64(10e9), *dps[0].Value.IntValue) + require.Equal(t, 3, len(dps[1].Dimensions)) + require.Equal(t, int64(4e9), *dps[0].Value.IntValue) + require.Equal(t, int64(6e9), *dps[1].Value.IntValue) } func TestCreateMetricsExporterWithDefaultExcludeMetrics(t *testing.T) { @@ -341,129 +352,140 @@ func testMetricsData() pmetric.Metrics { dp12.SetIntVal(6e9) m2 := ms.AppendEmpty() - m2.SetName("system.disk.io") + m2.SetName("system.disk.io.read") m2.SetDescription("Disk I/O.") m2.SetDataType(pmetric.MetricDataTypeSum) m2.Sum().SetIsMonotonic(true) m2.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) dp21 := m2.Sum().DataPoints().AppendEmpty() dp21.Attributes().InsertString("host", "host0") - dp21.Attributes().InsertString("direction", "read") dp21.Attributes().InsertString("device", "sda1") dp21.Attributes().Sort() dp21.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp21.SetIntVal(1e9) dp22 := m2.Sum().DataPoints().AppendEmpty() dp22.Attributes().InsertString("host", "host0") - dp22.Attributes().InsertString("direction", "read") dp22.Attributes().InsertString("device", "sda2") dp22.Attributes().Sort() dp22.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp22.SetIntVal(2e9) - dp23 := m2.Sum().DataPoints().AppendEmpty() + m3 := ms.AppendEmpty() + m3.SetName("system.disk.io.write") + m3.SetDescription("Disk I/O.") + m3.SetDataType(pmetric.MetricDataTypeSum) + m3.Sum().SetIsMonotonic(true) + m3.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) + dp23 := m3.Sum().DataPoints().AppendEmpty() dp23.Attributes().InsertString("host", "host0") - dp23.Attributes().InsertString("direction", "write") dp23.Attributes().InsertString("device", "sda1") dp23.Attributes().Sort() dp23.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp23.SetIntVal(3e9) - dp24 := m2.Sum().DataPoints().AppendEmpty() + dp24 := m3.Sum().DataPoints().AppendEmpty() dp24.Attributes().InsertString("host", "host0") - dp24.Attributes().InsertString("direction", "write") dp24.Attributes().InsertString("device", "sda2") dp24.Attributes().Sort() dp24.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp24.SetIntVal(8e9) - m3 := ms.AppendEmpty() - m3.SetName("system.disk.operations") - m3.SetDescription("Disk operations count.") - m3.SetUnit("bytes") - m3.SetDataType(pmetric.MetricDataTypeSum) - m3.Sum().SetIsMonotonic(true) - m3.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) - dp31 := m3.Sum().DataPoints().AppendEmpty() - dp31.Attributes().InsertString("host", "host0") - dp31.Attributes().InsertString("direction", "write") - dp31.Attributes().InsertString("device", "sda1") - dp31.Attributes().Sort() - dp31.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) - dp31.SetIntVal(4e3) - dp32 := m3.Sum().DataPoints().AppendEmpty() + m6 := ms.AppendEmpty() + m6.SetName("system.disk.operations.read") + m6.SetDescription("Disk operations read count.") + m6.SetUnit("bytes") + m6.SetDataType(pmetric.MetricDataTypeSum) + m6.Sum().SetIsMonotonic(true) + m6.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) + dp32 := m6.Sum().DataPoints().AppendEmpty() dp32.Attributes().InsertString("host", "host0") - dp32.Attributes().InsertString("direction", "read") dp32.Attributes().InsertString("device", "sda2") dp32.Attributes().Sort() dp32.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp32.SetIntVal(6e3) - dp33 := m3.Sum().DataPoints().AppendEmpty() + m7 := ms.AppendEmpty() + m7.SetName("system.disk.operations.write") + m7.SetDescription("Disk operations write count.") + m7.SetUnit("bytes") + m7.SetDataType(pmetric.MetricDataTypeSum) + m7.Sum().SetIsMonotonic(true) + m7.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) + dp31 := m7.Sum().DataPoints().AppendEmpty() + dp31.Attributes().InsertString("host", "host0") + dp31.Attributes().InsertString("device", "sda1") + dp31.Attributes().Sort() + dp31.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) + dp31.SetIntVal(4e3) + dp33 := m7.Sum().DataPoints().AppendEmpty() dp33.Attributes().InsertString("host", "host0") - dp33.Attributes().InsertString("direction", "write") dp33.Attributes().InsertString("device", "sda1") dp33.Attributes().Sort() dp33.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp33.SetIntVal(1e3) - dp34 := m3.Sum().DataPoints().AppendEmpty() + dp34 := m7.Sum().DataPoints().AppendEmpty() dp34.Attributes().InsertString("host", "host0") - dp34.Attributes().InsertString("direction", "write") dp34.Attributes().InsertString("device", "sda2") dp34.Attributes().Sort() dp34.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp34.SetIntVal(5e3) - m4 := ms.AppendEmpty() - m4.SetName("system.disk.operations") - m4.SetDescription("Disk operations count.") - m4.SetUnit("bytes") - m4.SetDataType(pmetric.MetricDataTypeSum) - m4.Sum().SetIsMonotonic(true) - m4.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) - dp41 := m4.Sum().DataPoints().AppendEmpty() + m8 := ms.AppendEmpty() + m8.SetName("system.disk.operations.read") + m8.SetDescription("Disk operations read count.") + m8.SetUnit("bytes") + m8.SetDataType(pmetric.MetricDataTypeSum) + m8.Sum().SetIsMonotonic(true) + m8.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) + dp41 := m8.Sum().DataPoints().AppendEmpty() dp41.Attributes().InsertString("host", "host0") - dp41.Attributes().InsertString("direction", "read") dp41.Attributes().InsertString("device", "sda1") dp41.Attributes().Sort() dp41.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000060, 0))) dp41.SetIntVal(6e3) - dp42 := m4.Sum().DataPoints().AppendEmpty() + dp42 := m8.Sum().DataPoints().AppendEmpty() dp42.Attributes().InsertString("host", "host0") - dp42.Attributes().InsertString("direction", "read") dp42.Attributes().InsertString("device", "sda2") dp42.Attributes().Sort() dp42.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000060, 0))) dp42.SetIntVal(8e3) - dp43 := m4.Sum().DataPoints().AppendEmpty() + m9 := ms.AppendEmpty() + m9.SetName("system.disk.operations.write") + m9.SetDescription("Disk operations write count.") + m9.SetUnit("bytes") + m9.SetDataType(pmetric.MetricDataTypeSum) + m9.Sum().SetIsMonotonic(true) + m9.Sum().SetAggregationTemporality(pmetric.MetricAggregationTemporalityCumulative) + dp43 := m9.Sum().DataPoints().AppendEmpty() dp43.Attributes().InsertString("host", "host0") - dp43.Attributes().InsertString("direction", "write") dp43.Attributes().InsertString("device", "sda1") dp43.Attributes().Sort() dp43.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000060, 0))) dp43.SetIntVal(3e3) - dp44 := m4.Sum().DataPoints().AppendEmpty() + dp44 := m9.Sum().DataPoints().AppendEmpty() dp44.Attributes().InsertString("host", "host0") - dp44.Attributes().InsertString("direction", "write") dp44.Attributes().InsertString("device", "sda2") dp44.Attributes().Sort() dp44.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000060, 0))) dp44.SetIntVal(7e3) - m5 := ms.AppendEmpty() - m5.SetName("system.network.io") - m5.SetDescription("The number of bytes transmitted and received") - m5.SetUnit("bytes") - m5.SetDataType(pmetric.MetricDataTypeGauge) - dp51 := m5.Gauge().DataPoints().AppendEmpty() + m10 := ms.AppendEmpty() + m10.SetName("system.network.io.receive") + m10.SetDescription("The number of bytes received") + m10.SetUnit("bytes") + m10.SetDataType(pmetric.MetricDataTypeGauge) + dp51 := m10.Gauge().DataPoints().AppendEmpty() dp51.Attributes().InsertString("host", "host0") - dp51.Attributes().InsertString("direction", "receive") dp51.Attributes().InsertString("device", "eth0") dp51.Attributes().InsertString("kubernetes_node", "node0") dp51.Attributes().InsertString("kubernetes_cluster", "cluster0") dp51.Attributes().Sort() dp51.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp51.SetIntVal(4e9) - dp52 := m5.Gauge().DataPoints().AppendEmpty() + m11 := ms.AppendEmpty() + m11.SetName("system.network.io.transmit") + m11.SetDescription("The number of bytes transmitted") + m11.SetUnit("bytes") + m11.SetDataType(pmetric.MetricDataTypeGauge) + dp52 := m11.Gauge().DataPoints().AppendEmpty() dp52.Attributes().InsertString("host", "host0") - dp52.Attributes().InsertString("direction", "transmit") dp52.Attributes().InsertString("device", "eth0") dp52.Attributes().InsertString("kubernetes_node", "node0") dp52.Attributes().InsertString("kubernetes_cluster", "cluster0") @@ -471,22 +493,20 @@ func testMetricsData() pmetric.Metrics { dp52.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp52.SetIntVal(6e9) - m6 := ms.AppendEmpty() - m6.SetName("system.network.packets") - m6.SetDescription("The number of packets transferred") - m6.SetDataType(pmetric.MetricDataTypeGauge) - dp61 := m6.Gauge().DataPoints().AppendEmpty() + m12 := ms.AppendEmpty() + m12.SetName("system.network.packets.receive") + m12.SetDescription("The number of packets received") + m12.SetDataType(pmetric.MetricDataTypeGauge) + dp61 := m12.Gauge().DataPoints().AppendEmpty() dp61.Attributes().InsertString("host", "host0") - dp61.Attributes().InsertString("direction", "receive") dp61.Attributes().InsertString("device", "eth0") dp61.Attributes().InsertString("kubernetes_node", "node0") dp61.Attributes().InsertString("kubernetes_cluster", "cluster0") dp61.Attributes().Sort() dp61.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp61.SetIntVal(200) - dp62 := m6.Gauge().DataPoints().AppendEmpty() + dp62 := m12.Gauge().DataPoints().AppendEmpty() dp62.Attributes().InsertString("host", "host0") - dp62.Attributes().InsertString("direction", "receive") dp62.Attributes().InsertString("device", "eth1") dp62.Attributes().InsertString("kubernetes_node", "node0") dp62.Attributes().InsertString("kubernetes_cluster", "cluster0") @@ -494,11 +514,11 @@ func testMetricsData() pmetric.Metrics { dp62.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp62.SetIntVal(150) - m7 := ms.AppendEmpty() - m7.SetName("container.memory.working_set") - m7.SetUnit("bytes") - m7.SetDataType(pmetric.MetricDataTypeGauge) - dp71 := m7.Gauge().DataPoints().AppendEmpty() + m14 := ms.AppendEmpty() + m14.SetName("container.memory.working_set") + m14.SetUnit("bytes") + m14.SetDataType(pmetric.MetricDataTypeGauge) + dp71 := m14.Gauge().DataPoints().AppendEmpty() dp71.Attributes().InsertString("host", "host0") dp71.Attributes().InsertString("kubernetes_node", "node0") dp71.Attributes().InsertString("kubernetes_cluster", "cluster0") @@ -506,10 +526,10 @@ func testMetricsData() pmetric.Metrics { dp71.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp71.SetIntVal(1000) - m8 := ms.AppendEmpty() - m8.SetName("container.memory.page_faults") - m8.SetDataType(pmetric.MetricDataTypeGauge) - dp81 := m8.Gauge().DataPoints().AppendEmpty() + m16 := ms.AppendEmpty() + m16.SetName("container.memory.page_faults") + m16.SetDataType(pmetric.MetricDataTypeGauge) + dp81 := m16.Gauge().DataPoints().AppendEmpty() dp81.Attributes().InsertString("host", "host0") dp81.Attributes().InsertString("kubernetes_node", "node0") dp81.Attributes().InsertString("kubernetes_cluster", "cluster0") @@ -517,10 +537,10 @@ func testMetricsData() pmetric.Metrics { dp81.SetTimestamp(pcommon.NewTimestampFromTime(time.Unix(1596000000, 0))) dp81.SetIntVal(1000) - m9 := ms.AppendEmpty() - m9.SetName("container.memory.major_page_faults") - m9.SetDataType(pmetric.MetricDataTypeGauge) - dp91 := m9.Gauge().DataPoints().AppendEmpty() + m20 := ms.AppendEmpty() + m20.SetName("container.memory.major_page_faults") + m20.SetDataType(pmetric.MetricDataTypeGauge) + dp91 := m20.Gauge().DataPoints().AppendEmpty() dp91.Attributes().InsertString("host", "host0") dp91.Attributes().InsertString("kubernetes_node", "node0") dp91.Attributes().InsertString("kubernetes_cluster", "cluster0") diff --git a/exporter/signalfxexporter/internal/translation/constants.go b/exporter/signalfxexporter/internal/translation/constants.go index 3c48b8fe3afe8..e7ba49fec6573 100644 --- a/exporter/signalfxexporter/internal/translation/constants.go +++ b/exporter/signalfxexporter/internal/translation/constants.go @@ -288,39 +288,76 @@ translation_rules: scale_factors_float: sf_temp.disk.summary_utilization: 100 - # Translations to derive disk I/O metrics. ## Calculate extra system.disk.operations.total and system.disk.io.total metrics summing up read/write ops/IO across all devices. - action: copy_metrics mapping: - system.disk.operations: sf_temp.system.disk.operations.total - system.disk.io: sf_temp.system.disk.io.total + system.disk.operations.read: sf_temp.system.disk.operations.read + system.disk.operations.write: sf_temp.system.disk.operations.write + +- action: add_dimensions + metric_name: sf_temp.system.disk.operations.read + dimension_pairs: + direction: + read: true +- action: add_dimensions + metric_name: sf_temp.system.disk.operations.write + dimension_pairs: + direction: + write: true + +- action: copy_metrics + mapping: + sf_temp.system.disk.operations.read: sf_temp.system.disk.operations.total + sf_temp.system.disk.operations.write: sf_temp.system.disk.operations.total + - action: aggregate_metric metric_name: sf_temp.system.disk.operations.total aggregation_method: sum without_dimensions: - device -- action: aggregate_metric - metric_name: sf_temp.system.disk.io.total - aggregation_method: sum - without_dimensions: - - device ## Calculate an extra disk_ops.total metric as number all all read and write operations happened since the last report. - action: copy_metrics mapping: - system.disk.operations: sf_temp.disk.ops + sf_temp.system.disk.operations.total: sf_temp.disk.ops - action: aggregate_metric metric_name: sf_temp.disk.ops aggregation_method: sum without_dimensions: - direction - - device - action: delta_metric mapping: sf_temp.disk.ops: disk_ops.total +- action: copy_metrics + mapping: + system.disk.io.read: sf_temp.system.disk.io.read + system.disk.io.write: sf_temp.system.disk.io.write + +- action: add_dimensions + metric_name: sf_temp.system.disk.io.read + dimension_pairs: + direction: + read: true +- action: add_dimensions + metric_name: sf_temp.system.disk.io.write + dimension_pairs: + direction: + write: true + +- action: copy_metrics + mapping: + sf_temp.system.disk.io.read: sf_temp.system.disk.io.total + sf_temp.system.disk.io.write: sf_temp.system.disk.io.total + +- action: aggregate_metric + metric_name: sf_temp.system.disk.io.total + aggregation_method: sum + without_dimensions: + - device + - action: delta_metric mapping: system.disk.pending_operations: disk_ops.pending @@ -330,33 +367,65 @@ translation_rules: ## Calculate extra network I/O metrics system.network.packets.total and system.network.io.total. - action: copy_metrics mapping: - system.network.packets: sf_temp.system.network.packets.total - system.network.io: sf_temp.system.network.io.total + system.network.packets.receive: sf_temp.system.network.packets.receive + system.network.packets.transmit: sf_temp.system.network.packets.transmit + system.network.io.receive: sf_temp.system.network.io.receive + system.network.io.transmit: sf_temp.system.network.io.transmit + +- action: add_dimensions + metric_name: sf_temp.system.network.packets.receive + dimension_pairs: + direction: + receive: true +- action: add_dimensions + metric_name: sf_temp.system.network.packets.transmit + dimension_pairs: + direction: + transmit: true +- action: copy_metrics + mapping: + sf_temp.system.network.packets.receive: sf_temp.system.network.packets.total + sf_temp.system.network.packets.transmit: sf_temp.system.network.packets.total + - action: aggregate_metric metric_name: sf_temp.system.network.packets.total aggregation_method: sum without_dimensions: - device + +- action: add_dimensions + metric_name: sf_temp.system.network.io.receive + dimension_pairs: + direction: + receive: true +- action: add_dimensions + metric_name: sf_temp.system.network.io.transmit + dimension_pairs: + direction: + transmit: true + +# Need two copies of metrics, one for aggregating with directions, one for total aggregation. +- action: copy_metrics + mapping: + sf_temp.system.network.io.receive: sf_temp.system.network.io.total + sf_temp.system.network.io.transmit: sf_temp.system.network.io.total + - action: aggregate_metric metric_name: sf_temp.system.network.io.total aggregation_method: sum without_dimensions: - device -## Calculate extra network.total metric. - action: copy_metrics mapping: - system.network.io: sf_temp.network.total - dimension_key: direction - dimension_values: - receive: true - transmit: true + sf_temp.system.network.io.total: sf_temp.network.total + +## Calculate extra network.total metric. - action: aggregate_metric metric_name: sf_temp.network.total aggregation_method: sum without_dimensions: - direction - - device # memory utilization - action: calculate_new_metric @@ -370,22 +439,20 @@ translation_rules: sf_temp.memory.utilization: 100 # Virtual memory metrics -- action: split_metric - metric_name: system.paging.operations - dimension_key: direction +- action: copy_metrics mapping: - page_in: sf_temp.system.paging.operations.page_in - page_out: sf_temp.system.paging.operations.page_out + system.paging.operations.in: sf_temp.system.paging.operations.in + system.paging.operations.out: sf_temp.system.paging.operations.out - action: split_metric - metric_name: sf_temp.system.paging.operations.page_in + metric_name: sf_temp.system.paging.operations.in dimension_key: type mapping: major: vmpage_io.swap.in minor: vmpage_io.memory.in - action: split_metric - metric_name: sf_temp.system.paging.operations.page_out + metric_name: sf_temp.system.paging.operations.out dimension_key: type mapping: major: vmpage_io.swap.out @@ -450,9 +517,17 @@ translation_rules: sf_temp.system.cpu.delta: true sf_temp.system.cpu.total: true sf_temp.system.cpu.usage: true + sf_temp.system.disk.io.read: true + sf_temp.system.disk.io.write: true + sf_temp.system.disk.operations.read: true + sf_temp.system.disk.operations.write: true sf_temp.system.filesystem.usage: true sf_temp.system.memory.usage: true - sf_temp.system.paging.operations.page_in: true - sf_temp.system.paging.operations.page_out: true + sf_temp.system.network.packets.receive: true + sf_temp.system.network.packets.transmit: true + sf_temp.system.network.io.receive: true + sf_temp.system.network.io.transmit: true + sf_temp.system.paging.operations.in: true + sf_temp.system.paging.operations.out: true ` ) diff --git a/exporter/signalfxexporter/internal/translation/translator.go b/exporter/signalfxexporter/internal/translation/translator.go index 7b1ac000d51b9..e31cee7d8f883 100644 --- a/exporter/signalfxexporter/internal/translation/translator.go +++ b/exporter/signalfxexporter/internal/translation/translator.go @@ -136,6 +136,20 @@ const ( // dimension_pairs: // dim_key1: ActionDropDimensions Action = "drop_dimensions" + + // ActionAddDimensions will add specified dimensions. If no corresponding metric is provided, the + // dimensions will be added globally to all datapoints. If no value is provided with dimension, an empty + // string will be the default value. + // - action: add_dimensions + // metric_names: + // system.disk.io + // dimension_pairs: + // dim_key1: value1 + // dim_key2: + // - action: add_dimensions + // dimension_pairs: + // dim_key1: + ActionAddDimensions Action = "add_dimensions" ) type MetricOperator string @@ -221,8 +235,8 @@ type Rule struct { Operand2Metric string `mapstructure:"operand2_metric"` Operator MetricOperator `mapstructure:"operator"` - // DimensionPairs used by "drop_dimensions" translation rule to specify dimension pairs that - // should be dropped. + // DimensionPairs used by the "drop_dimensions" and "add_dimensions" translation rules to specify dimension pairs + // that should be dropped or added, respectively. DimensionPairs map[string]map[string]bool `mapstructure:"dimension_pairs"` metricMatcher *dpfilters.StringFilter @@ -352,6 +366,10 @@ func validateTranslationRules(rules []Rule) error { if len(tr.DimensionPairs) == 0 { return fmt.Errorf(`field "dimension_pairs" is required for %q translation rule`, tr.Action) } + case ActionAddDimensions: + if len(tr.DimensionPairs) == 0 { + return fmt.Errorf(`field "dimension_pairs" is required for %q translation rule`, tr.Action) + } default: return fmt.Errorf("unknown \"action\" value: %q", tr.Action) } @@ -372,7 +390,7 @@ func createDimensionsMap(rules []Rule) map[string]string { func processRules(rules []Rule) error { for i, tr := range rules { - if tr.Action == ActionDropDimensions { + if tr.Action == ActionDropDimensions || tr.Action == ActionAddDimensions { // Set metric name filter, if metric name(s) are specified on the rule. // When "drop_dimensions" actions is not scoped to a metric name, the // specified dimensions will be globally dropped from all datapoints @@ -545,6 +563,11 @@ func (mp *MetricTranslator) TranslateDataPoints(logger *zap.Logger, sfxDataPoint for _, dp := range processedDataPoints { dropDimensions(dp, tr) } + + case ActionAddDimensions: + for _, dp := range processedDataPoints { + addDimensions(dp, tr) + } } } @@ -864,6 +887,23 @@ func dropDimensions(dp *sfxpb.DataPoint, rule Rule) { dp.Dimensions = processedDimensions } +func addDimensions(dp *sfxpb.DataPoint, rule Rule) { + if rule.metricMatcher != nil && !rule.metricMatcher.Matches(dp.Metric) { + return + } + + for dimName, dimValueMap := range rule.DimensionPairs { + if len(dimValueMap) == 0 { + dp.Dimensions = append(dp.Dimensions, &sfxpb.Dimension{Key: dimName, Value: ""}) + } else { + for dimVal := range dimValueMap { + dp.Dimensions = append(dp.Dimensions, &sfxpb.Dimension{Key: dimName, Value: dimVal}) + } + } + + } +} + func filterDimensionsByValues( dimensions []*sfxpb.Dimension, dimensionPairs map[string]map[string]bool) []*sfxpb.Dimension { diff --git a/exporter/signalfxexporter/internal/translation/translator_test.go b/exporter/signalfxexporter/internal/translation/translator_test.go index c4956efae1dd6..1e5cf84808197 100644 --- a/exporter/signalfxexporter/internal/translation/translator_test.go +++ b/exporter/signalfxexporter/internal/translation/translator_test.go @@ -2959,6 +2959,382 @@ func TestDropDimensionsErrorCases(t *testing.T) { } } +func TestAddDimensions(t *testing.T) { + tests := []struct { + name string + rules []Rule + inputDps []*sfxpb.DataPoint + expectedDps []*sfxpb.DataPoint + }{ + { + name: "Add dimensions to metrics that match specified metric names", + rules: []Rule{ + { + Action: ActionAddDimensions, + MetricName: "/metric.*/", + MetricNames: map[string]bool{ + "testmetric": true, + }, + DimensionPairs: map[string]map[string]bool{ + "dim_key1": nil, + "dim_key2": { + "dim_val1": true, + "dim_val2": true, + }, + }, + }, + }, + inputDps: []*sfxpb.DataPoint{ + { + Metric: "metric1", + Dimensions: []*sfxpb.Dimension{}, + }, + { + Metric: "metrik1", + Dimensions: []*sfxpb.Dimension{}, + }, + { + Metric: "testmetric", + Dimensions: []*sfxpb.Dimension{}, + }, + }, + expectedDps: []*sfxpb.DataPoint{ + { + Metric: "metric1", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "", + }, + { + Key: "dim_key2", + Value: "dim_val1", + }, + { + Key: "dim_key2", + Value: "dim_val2", + }, + }, + }, + { + Metric: "metrik1", + Dimensions: []*sfxpb.Dimension{}, + }, + { + Metric: "testmetric", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "", + }, + { + Key: "dim_key2", + Value: "dim_val1", + }, + { + Key: "dim_key2", + Value: "dim_val2", + }, + }, + }, + }, + }, + { + name: "Add dimensions to all metrics without filtering by metric name", + rules: []Rule{ + { + Action: ActionAddDimensions, + DimensionPairs: map[string]map[string]bool{ + "dim_key1": nil, + "dim_key2": { + "dim_val1": true, + "dim_val2": true, + }, + }, + }, + }, + inputDps: []*sfxpb.DataPoint{ + { + Metric: "metric1", + Dimensions: []*sfxpb.Dimension{}, + }, + { + Metric: "metric2", + Dimensions: []*sfxpb.Dimension{}, + }, + { + Metric: "testmetric", + Dimensions: []*sfxpb.Dimension{}, + }, + }, + expectedDps: []*sfxpb.DataPoint{ + { + Metric: "metric1", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "", + }, + { + Key: "dim_key2", + Value: "dim_val1", + }, + { + Key: "dim_key2", + Value: "dim_val2", + }, + }, + }, + { + Metric: "metric2", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "", + }, + { + Key: "dim_key2", + Value: "dim_val1", + }, + { + Key: "dim_key2", + Value: "dim_val2", + }, + }, + }, + { + Metric: "testmetric", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "", + }, + { + Key: "dim_key2", + Value: "dim_val1", + }, + { + Key: "dim_key2", + Value: "dim_val2", + }, + }, + }, + }, + }, + { + name: "Ensure metrics are unchanged if given names don't matches", + rules: []Rule{ + { + Action: ActionAddDimensions, + MetricName: "foo", + DimensionPairs: map[string]map[string]bool{ + "dim_key1": { + "dim_val1": true, + "dim_val2": true, + }, + }, + }, + }, + inputDps: []*sfxpb.DataPoint{ + { + Metric: "metric1", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "dim_val1", + }, + { + Key: "dim_key2", + Value: "dim_val1", + }, + }, + }, + { + Metric: "metric2", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "dim_val2", + }, + { + Key: "dim_key2", + Value: "dim_val2", + }, + }, + }, + }, + expectedDps: []*sfxpb.DataPoint{ + { + Metric: "metric1", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "dim_val1", + }, + { + Key: "dim_key2", + Value: "dim_val1", + }, + }, + }, + { + Metric: "metric2", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "dim_val2", + }, + { + Key: "dim_key2", + Value: "dim_val2", + }, + }, + }, + }, + }, + { + name: "Ensure dimensions are added even if dimension key:value already exists", + rules: []Rule{ + { + Action: ActionAddDimensions, + DimensionPairs: map[string]map[string]bool{ + "dim_key1": { + "dim_val1": true, + "dim_val2": true, + }, + }, + }, + }, + inputDps: []*sfxpb.DataPoint{ + { + Metric: "metric1", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "dim_val1", + }, + { + Key: "dim_key2", + Value: "dim_val1", + }, + }, + }, + { + Metric: "metric2", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "dim_val2", + }, + { + Key: "dim_key2", + Value: "dim_val2", + }, + }, + }, + }, + expectedDps: []*sfxpb.DataPoint{ + { + Metric: "metric1", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "dim_val1", + }, + { + Key: "dim_key2", + Value: "dim_val1", + }, + { + Key: "dim_key1", + Value: "dim_val1", + }, + { + Key: "dim_key1", + Value: "dim_val2", + }, + }, + }, + { + Metric: "metric2", + Dimensions: []*sfxpb.Dimension{ + { + Key: "dim_key1", + Value: "dim_val2", + }, + { + Key: "dim_key2", + Value: "dim_val2", + }, + { + Key: "dim_key1", + Value: "dim_val1", + }, + { + Key: "dim_key1", + Value: "dim_val2", + }, + }, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + mt, err := NewMetricTranslator(test.rules, 1) + require.NoError(t, err) + outputSFxDps := mt.TranslateDataPoints(zap.NewNop(), test.inputDps) + require.Equal(t, len(test.expectedDps), len(outputSFxDps)) + + for _, outputDp := range outputSFxDps { + for _, expectedDp := range test.expectedDps { + // Need to make sure dimensions match for right metrics. + if outputDp.Metric == expectedDp.Metric { + // Input dimension format is a map, so we can't guarantee order. This + // check doesn't assume order. + require.True(t, dimensionsEqual(outputDp.Dimensions, expectedDp.Dimensions)) + } + } + } + }) + } +} + +func TestAddDimensionsErrorCases(t *testing.T) { + tests := []struct { + name string + rules []Rule + expectedError string + }{ + { + name: "Test with invalid metric name pattern", + rules: []Rule{ + { + Action: ActionAddDimensions, + MetricName: "/metric.*(/", + DimensionPairs: map[string]map[string]bool{ + "dim_key1": nil, + "dim_key2": { + "dim_val1": true, + "dim_val2": true, + }, + }, + }, + }, + expectedError: "failed creating metric matcher: error parsing regexp: missing closing ): `metric.*(`", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + mt, err := NewMetricTranslator(test.rules, 1) + require.EqualError(t, err, test.expectedError) + require.Nil(t, mt) + }) + } +} + func testConverter(t *testing.T, mapping map[string]string) *MetricsConverter { rules := []Rule{{ Action: ActionDeltaMetric, diff --git a/unreleased/signalfx_exporter_remove_direction.yaml b/unreleased/signalfx_exporter_remove_direction.yaml new file mode 100755 index 0000000000000..b0d1712e06591 --- /dev/null +++ b/unreleased/signalfx_exporter_remove_direction.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: signalfxexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support removal of direction attribute + +# One or more tracking issues related to the change +issues: [12641] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: From ee3684eb8affb26ade065c031caf8a677e9e0f71 Mon Sep 17 00:00:00 2001 From: Curtis Robert Date: Thu, 21 Jul 2022 16:44:26 -0700 Subject: [PATCH 2/2] Undo testing change to delta timeout --- exporter/signalfxexporter/factory_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/signalfxexporter/factory_test.go b/exporter/signalfxexporter/factory_test.go index 28ee70c002a1e..22a78446d5fc3 100644 --- a/exporter/signalfxexporter/factory_test.go +++ b/exporter/signalfxexporter/factory_test.go @@ -188,7 +188,7 @@ func TestDefaultTranslationRules(t *testing.T) { rules, err := loadDefaultTranslationRules() require.NoError(t, err) require.NotNil(t, rules, "rules are nil") - tr, err := translation.NewMetricTranslator(rules, 600) + tr, err := translation.NewMetricTranslator(rules, 1) require.NoError(t, err) data := testMetricsData()