Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 47 additions & 7 deletions receiver/prometheusremotewritereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,46 @@ type prometheusRemoteWriteReceiver struct {
wg sync.WaitGroup
}

// MetricIdentity contains all the components that uniquely identify a metric
// according to the OpenTelemetry Protocol data model.
// The definition of the metric uniqueness is based on the following document. Ref: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#opentelemetry-protocol-data-model
type MetricIdentity struct {
ResourceID string
ScopeName string
ScopeVersion string
MetricName string
Unit string
Type writev2.Metadata_MetricType
}

// createMetricIdentity creates a MetricIdentity struct from the required components
func createMetricIdentity(resourceID, scopeName, scopeVersion, metricName, unit string, metricType writev2.Metadata_MetricType) MetricIdentity {
return MetricIdentity{
ResourceID: resourceID,
ScopeName: scopeName,
ScopeVersion: scopeVersion,
MetricName: metricName,
Unit: unit,
Type: metricType,
}
}

// Hash generates a unique hash for the metric identity
func (mi MetricIdentity) Hash() uint64 {
const separator = "\xff"

combined := strings.Join([]string{
mi.ResourceID,
mi.ScopeName,
mi.ScopeVersion,
mi.MetricName,
mi.Unit,
fmt.Sprintf("%d", mi.Type),
}, separator)

return xxhash.Sum64String(combined)
}

func (prw *prometheusRemoteWriteReceiver) Start(ctx context.Context, host component.Host) error {
mux := http.NewServeMux()
mux.HandleFunc("/api/v1/write", prw.handlePRW)
Expand Down Expand Up @@ -179,8 +219,7 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
// between requests based on the metric "target_info".
intraRequestCache = make(map[uint64]pmetric.ResourceMetrics)
// The key is composed by: resource_hash:scope_name:scope_version:metric_name:unit:type
// TODO: use the appropriate hash function.
metricCache = make(map[string]pmetric.Metric)
metricCache = make(map[uint64]pmetric.Metric)
)

for _, ts := range req.Timeseries {
Expand Down Expand Up @@ -221,16 +260,17 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
description := req.Symbols[ts.Metadata.HelpRef]

resourceID := identity.OfResource(rm.Resource())
// Temporary approach to generate the metric key.
// TODO: Replace this with a proper hashing function.
// The definition of the metric uniqueness is based on the following document. Ref: https://opentelemetry.io/docs/specs/otel/metrics/data-model/#opentelemetry-protocol-data-model
metricKey := fmt.Sprintf("%s:%s:%s:%s:%s:%d",

metricIdentity := createMetricIdentity(
resourceID.String(), // Resource identity
scopeName, // Scope name
scopeVersion, // Scope version
metricName, // Metric name
unit, // Unit
ts.Metadata.Type) // Metric type
ts.Metadata.Type, // Metric type
)

metricKey := metricIdentity.Hash()

var scope pmetric.ScopeMetrics
var foundScope bool
Expand Down