Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 10 additions & 14 deletions exporter/opencensusexporter/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,19 @@ func (oce *ocExporter) pushMetricsData(_ context.Context, md pdata.Metrics) erro
}
}

ocmds := internaldata.MetricsToOC(md)
for _, ocmd := range ocmds {
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
ocReq := agentmetricspb.ExportMetricsServiceRequest{}
ocReq.Node, ocReq.Resource, ocReq.Metrics = internaldata.ResourceMetricsToOC(rms.At(i))

// This is a hack because OC protocol expects a Node for the initial message.
node := ocmd.Node
if node == nil {
node = &commonpb.Node{}
}
resource := ocmd.Resource
if resource == nil {
resource = &resourcepb.Resource{}
if ocReq.Node == nil {
ocReq.Node = &commonpb.Node{}
}
req := &agentmetricspb.ExportMetricsServiceRequest{
Metrics: ocmd.Metrics,
Resource: resource,
Node: node,
if ocReq.Resource == nil {
ocReq.Resource = &resourcepb.Resource{}
}
if err := mClient.msec.Send(req); err != nil {
if err := mClient.msec.Send(&ocReq); err != nil {
// Error received, cancel the context used to create the RPC to free all resources,
// put back nil to keep the number of workers constant.
mClient.cancel()
Expand Down
8 changes: 4 additions & 4 deletions exporter/prometheusexporter/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ func TestPrometheusExporter_endToEnd(t *testing.T) {
require.NoError(t, exp.Start(context.Background(), componenttest.NewNopHost()))

// Should accumulate multiple metrics
md := internaldata.OCToMetrics(internaldata.MetricsData{Metrics: metricBuilder(128, "metric_1_")})
md := internaldata.OCToMetrics(nil, nil, metricBuilder(128, "metric_1_"))
assert.NoError(t, exp.ConsumeMetrics(context.Background(), md))

for delta := 0; delta <= 20; delta += 10 {
md := internaldata.OCToMetrics(internaldata.MetricsData{Metrics: metricBuilder(int64(delta), "metric_2_")})
md := internaldata.OCToMetrics(nil, nil, metricBuilder(int64(delta), "metric_2_"))
assert.NoError(t, exp.ConsumeMetrics(context.Background(), md))

res, err1 := http.Get("http://localhost:7777/metrics")
Expand Down Expand Up @@ -208,11 +208,11 @@ func TestPrometheusExporter_endToEndWithTimestamps(t *testing.T) {

// Should accumulate multiple metrics

md := internaldata.OCToMetrics(internaldata.MetricsData{Metrics: metricBuilder(128, "metric_1_")})
md := internaldata.OCToMetrics(nil, nil, metricBuilder(128, "metric_1_"))
assert.NoError(t, exp.ConsumeMetrics(context.Background(), md))

for delta := 0; delta <= 20; delta += 10 {
md := internaldata.OCToMetrics(internaldata.MetricsData{Metrics: metricBuilder(int64(delta), "metric_2_")})
md := internaldata.OCToMetrics(nil, nil, metricBuilder(int64(delta), "metric_2_"))
assert.NoError(t, exp.ConsumeMetrics(context.Background(), md))

res, err1 := http.Get("http://localhost:7777/metrics")
Expand Down
19 changes: 6 additions & 13 deletions receiver/opencensusreceiver/ocmetrics/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1"
agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
ocmetrics "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"

"go.opentelemetry.io/collector/component/componenterror"
Expand Down Expand Up @@ -116,36 +117,28 @@ func (ocr *Receiver) processReceivedMsg(
resource = recv.Resource
}

md := internaldata.MetricsData{
Node: lastNonNilNode,
Resource: resource,
Metrics: recv.Metrics,
}

err := ocr.sendToNextConsumer(longLivedRPCCtx, md)
err := ocr.sendToNextConsumer(longLivedRPCCtx, lastNonNilNode, resource, recv.Metrics)
return lastNonNilNode, resource, err
}

func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, md internaldata.MetricsData) error {
func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, node *commonpb.Node, resource *resourcepb.Resource, metrics []*ocmetrics.Metric) error {
ctx := obsreport.StartMetricsReceiveOp(
longLivedRPCCtx,
ocr.id,
receiverTransport,
obsreport.WithLongLivedCtx())

numTimeSeries := 0
numPoints := 0
// Count number of time series and data points.
for _, metric := range md.Metrics {
numTimeSeries += len(metric.Timeseries)
for _, metric := range metrics {
for _, ts := range metric.GetTimeseries() {
numPoints += len(ts.GetPoints())
}
}

var consumerErr error
if len(md.Metrics) > 0 {
consumerErr = ocr.nextConsumer.ConsumeMetrics(ctx, internaldata.OCToMetrics(md))
if len(metrics) > 0 {
consumerErr = ocr.nextConsumer.ConsumeMetrics(ctx, internaldata.OCToMetrics(node, resource, metrics))
}

obsreport.EndMetricsReceiveOp(
Expand Down
14 changes: 8 additions & 6 deletions receiver/opencensusreceiver/ocmetrics/opencensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,10 @@ func TestExportMultiplexing(t *testing.T) {
// Examination time!
resultsMapping := make(map[string][]*metricspb.Metric)
for _, md := range metricSink.AllMetrics() {
ocmds := internaldata.MetricsToOC(md)
for _, ocmd := range ocmds {
resultsMapping[nodeToKey(ocmd.Node)] = append(resultsMapping[nodeToKey(ocmd.Node)], ocmd.Metrics...)
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
node, _, metrics := internaldata.ResourceMetricsToOC(rms.At(i))
resultsMapping[nodeToKey(node)] = append(resultsMapping[nodeToKey(node)], metrics...)
}
}

Expand Down Expand Up @@ -292,9 +293,10 @@ func TestExportProtocolConformation_metricsInFirstMessage(t *testing.T) {
// Examination time!
resultsMapping := make(map[string][]*metricspb.Metric)
for _, md := range metricSink.AllMetrics() {
ocmds := internaldata.MetricsToOC(md)
for _, ocmd := range ocmds {
resultsMapping[nodeToKey(ocmd.Node)] = append(resultsMapping[nodeToKey(ocmd.Node)], ocmd.Metrics...)
rms := md.ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
node, _, metrics := internaldata.ResourceMetricsToOC(rms.At(i))
resultsMapping[nodeToKey(node)] = append(resultsMapping[nodeToKey(node)], metrics...)
}
}

Expand Down
6 changes: 1 addition & 5 deletions receiver/prometheusreceiver/internal/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,7 @@ func (tr *transaction) Commit() error {

numPoints := 0
if len(metrics) > 0 {
md := internaldata.OCToMetrics(internaldata.MetricsData{
Node: tr.node,
Resource: tr.resource,
Metrics: metrics,
})
md := internaldata.OCToMetrics(tr.node, tr.resource, metrics)
_, numPoints = md.MetricAndDataPointCount()
err = tr.sink.ConsumeMetrics(ctx, md)
}
Expand Down
12 changes: 9 additions & 3 deletions receiver/prometheusreceiver/internal/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"testing"
"time"

agentmetricspb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/metrics/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/scrape"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/config"
Expand Down Expand Up @@ -117,10 +119,14 @@ func Test_transaction(t *testing.T) {
if len(mds) != 1 {
t.Fatalf("wanted one batch, got %v\n", sink.AllMetrics())
}
ocmds := internaldata.MetricsToOC(mds[0])
if len(ocmds) != 1 {
t.Fatalf("wanted one batch per node, got %v\n", sink.AllMetrics())
var ocmds []*agentmetricspb.ExportMetricsServiceRequest
rms := mds[0].ResourceMetrics()
for i := 0; i < rms.Len(); i++ {
ocmd := &agentmetricspb.ExportMetricsServiceRequest{}
ocmd.Node, ocmd.Resource, ocmd.Metrics = internaldata.ResourceMetricsToOC(rms.At(i))
ocmds = append(ocmds, ocmd)
}
require.Len(t, ocmds, 1)
if !proto.Equal(ocmds[0].Node, expectedNode) {
t.Errorf("generated node %v and expected node %v is different\n", ocmds[0].Node, expectedNode)
}
Expand Down
Loading