Skip to content

Commit 2c7ea61

Browse files
perebajVihasMakwanaArthurSens
authored
[prometheusremotewritereceiver] concurrency bug (#43383)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Fixed a concurrency bug in the Prometheus remote write receiver where concurrent requests with identical job/instance labels would return empty responses after the first successful request. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #42159 <!--Describe what testing was performed and which tests were added.--> #### Testing <!--Describe the documentation added.--> #### Documentation <!--Please delete paragraphs that you did not use before submitting.--> --------- Signed-off-by: perebaj <[email protected]> Co-authored-by: Vihas Makwana <[email protected]> Co-authored-by: Arthur Silva Sens <[email protected]>
1 parent 76b4c60 commit 2c7ea61

File tree

3 files changed

+207
-78
lines changed

3 files changed

+207
-78
lines changed

.chloggen/concurrency-bug.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: bug_fix
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: receiver/prometheusremotewrite
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Fixed a concurrency bug in the Prometheus remote write receiver where concurrent requests with identical job/instance labels would return empty responses after the first successful request.
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [42159]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: [user]

receiver/prometheusremotewritereceiver/receiver.go

Lines changed: 59 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -232,20 +232,62 @@ func (*prometheusRemoteWriteReceiver) parseProto(contentType string) (promconfig
232232
return promconfig.RemoteWriteProtoMsgV1, nil
233233
}
234234

235+
// getOrCreateRM returns or creates the ResourceMetrics for a job/instance pair within an HTTP request.
236+
//
237+
// Two-level cache:
238+
//
239+
// 1. reqRM (per-request): groups samples with the same job/instance into a single ResourceMetrics
240+
// during current request processing, avoiding duplication in the output.
241+
//
242+
// 2. prw.rmCache (global LRU): stores snapshots of previously seen resource attributes (from target_info),
243+
// allowing future requests to reuse enriched attributes.
244+
//
245+
// This function always creates new ResourceMetrics per request, only copying attributes
246+
// from the LRU cache when available. Never returns cached objects to avoid shared
247+
// mutation across concurrent requests.
248+
func (prw *prometheusRemoteWriteReceiver) getOrCreateRM(ls labels.Labels, otelMetrics pmetric.Metrics, reqRM map[uint64]pmetric.ResourceMetrics) (pmetric.ResourceMetrics, uint64) {
249+
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))
250+
251+
if rm, ok := reqRM[hashedLabels]; ok {
252+
return rm, hashedLabels
253+
}
254+
255+
rm := otelMetrics.ResourceMetrics().AppendEmpty()
256+
if existingRM, ok := prw.rmCache.Get(hashedLabels); ok {
257+
// When the ResourceMetrics already exists in the global cache, we can reuse the previous snapshots and perpass the already seen attributes to the current request.
258+
existingRM.Resource().Attributes().CopyTo(rm.Resource().Attributes())
259+
} else {
260+
// When the ResourceMetrics does not exist in the global cache, we need to create a new one and add it to the request map.
261+
// Saving the new ResourceMetrics in the global cache to avoid creating duplicates in the next requests.
262+
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance"))
263+
snapshot := pmetric.NewResourceMetrics()
264+
rm.Resource().Attributes().CopyTo(snapshot.Resource().Attributes())
265+
prw.rmCache.Add(hashedLabels, snapshot)
266+
}
267+
268+
reqRM[hashedLabels] = rm
269+
return rm, hashedLabels
270+
}
271+
235272
// translateV2 translates a v2 remote-write request into OTLP metrics.
236273
// translate is not feature complete.
237274
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
238275
var (
239276
badRequestErrors error
240-
otelMetrics = pmetric.NewMetrics()
241-
labelsBuilder = labels.NewScratchBuilder(0)
277+
// otelMetrics represents the final metrics, after all the processing, that will be returned by the receiver.
278+
otelMetrics = pmetric.NewMetrics()
279+
labelsBuilder = labels.NewScratchBuilder(0)
242280
// More about stats: https://github.com/prometheus/docs/blob/main/docs/specs/prw/remote_write_spec_2_0.md#required-written-response-headers
243281
// TODO: add exemplars to the stats.
244282
stats = promremote.WriteResponseStats{
245283
Confirmed: true,
246284
}
247285
// The key is composed by: resource_hash:scope_name:scope_version:metric_name:unit:type
248286
metricCache = make(map[uint64]pmetric.Metric)
287+
// modifiedResourceMetric keeps track, for each request, of which resources (identified by the job/instance hash) had their resource attributes modified — for example, through target_info.
288+
// Once the request is fully processed, only the resource attributes contained in the request’s ResourceMetrics are snapshotted back into the LRU cache.
289+
// This ensures that future requests start with the enriched resource attributes already applied.
290+
modifiedResourceMetric = make(map[uint64]pmetric.ResourceMetrics)
249291
)
250292

251293
for i := range req.Timeseries {
@@ -267,25 +309,19 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
267309
// If the metric name is equal to target_info, we use its labels as attributes of the resource
268310
// Ref: https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/#resource-attributes-1
269311
if metadata.Name == "target_info" {
270-
var rm pmetric.ResourceMetrics
271-
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))
272-
273-
if existingRM, ok := prw.rmCache.Get(hashedLabels); ok {
274-
rm = existingRM
275-
} else {
276-
rm = otelMetrics.ResourceMetrics().AppendEmpty()
277-
}
278-
312+
rm, hashed := prw.getOrCreateRM(ls, otelMetrics, modifiedResourceMetric)
279313
attrs := rm.Resource().Attributes()
280-
parseJobAndInstance(attrs, ls.Get("job"), ls.Get("instance"))
281314

282315
// Add the remaining labels as resource attributes
283316
for labelName, labelValue := range ls.Map() {
284317
if labelName != "job" && labelName != "instance" && !schema.IsMetadataLabel(labelName) {
285318
attrs.PutStr(labelName, labelValue)
286319
}
287320
}
288-
prw.rmCache.Add(hashedLabels, rm)
321+
322+
snapshot := pmetric.NewResourceMetrics()
323+
attrs.CopyTo(snapshot.Resource().Attributes())
324+
prw.rmCache.Add(hashed, snapshot)
289325
continue
290326
}
291327

@@ -306,21 +342,12 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr
306342

307343
// Handle histograms separately due to their complex mixed-schema processing
308344
if ts.Metadata.Type == writev2.Metadata_METRIC_TYPE_HISTOGRAM {
309-
prw.processHistogramTimeSeries(otelMetrics, ls, ts, scopeName, scopeVersion, metricName, unit, description, metricCache, &stats)
345+
prw.processHistogramTimeSeries(otelMetrics, ls, ts, scopeName, scopeVersion, metricName, unit, description, metricCache, &stats, modifiedResourceMetric)
310346
continue
311347
}
312348

313349
// Handle regular metrics (gauge, counter, summary)
314-
hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))
315-
existingRM, ok := prw.rmCache.Get(hashedLabels)
316-
var rm pmetric.ResourceMetrics
317-
if ok {
318-
rm = existingRM
319-
} else {
320-
rm = otelMetrics.ResourceMetrics().AppendEmpty()
321-
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance"))
322-
prw.rmCache.Add(hashedLabels, rm)
323-
}
350+
rm, _ := prw.getOrCreateRM(ls, otelMetrics, modifiedResourceMetric)
324351

325352
resourceID := identity.OfResource(rm.Resource())
326353
metricIdentity := createMetricIdentity(
@@ -407,6 +434,7 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries(
407434
scopeName, scopeVersion, metricName, unit, description string,
408435
metricCache map[uint64]pmetric.Metric,
409436
stats *promremote.WriteResponseStats,
437+
modifiedRM map[uint64]pmetric.ResourceMetrics,
410438
) {
411439
// Drop classic histogram series (those with samples)
412440
if len(ts.Samples) != 0 {
@@ -415,10 +443,12 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries(
415443
return
416444
}
417445

418-
var rm pmetric.ResourceMetrics
419-
var hashedLabels uint64
420-
var resourceID identity.Resource
421-
var scope pmetric.ScopeMetrics
446+
var (
447+
hashedLabels uint64
448+
resourceID identity.Resource
449+
scope pmetric.ScopeMetrics
450+
rm pmetric.ResourceMetrics
451+
)
422452

423453
for i := range ts.Histograms {
424454
histogram := &ts.Histograms[i]
@@ -441,15 +471,7 @@ func (prw *prometheusRemoteWriteReceiver) processHistogramTimeSeries(
441471
}
442472
// Create resource if needed (only for the first valid histogram)
443473
if hashedLabels == 0 {
444-
hashedLabels = xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance"))
445-
existingRM, ok := prw.rmCache.Get(hashedLabels)
446-
if ok {
447-
rm = existingRM
448-
} else {
449-
rm = otelMetrics.ResourceMetrics().AppendEmpty()
450-
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance"))
451-
prw.rmCache.Add(hashedLabels, rm)
452-
}
474+
rm, _ = prw.getOrCreateRM(ls, otelMetrics, modifiedRM)
453475
resourceID = identity.OfResource(rm.Resource())
454476
}
455477

receiver/prometheusremotewritereceiver/receiver_test.go

Lines changed: 121 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"math"
1212
"net/http"
1313
"net/http/httptest"
14+
"strconv"
1415
"sync"
1516
"testing"
1617
"time"
@@ -1495,48 +1496,24 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) {
14951496
},
14961497
},
14971498
},
1498-
{
1499-
name: "normal metric first, target_info second",
1500-
requests: []*writev2.Request{
1501-
{
1502-
Symbols: []string{
1503-
"",
1504-
"job", "production/service_a", // 1, 2
1505-
"instance", "host1", // 3, 4
1506-
"__name__", "normal_metric", // 5, 6
1507-
"foo", "bar", // 7, 8
1508-
},
1509-
Timeseries: []writev2.TimeSeries{
1510-
{
1511-
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
1512-
LabelsRefs: []uint32{5, 6, 1, 2, 3, 4, 7, 8},
1513-
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
1514-
},
1515-
},
1516-
},
1517-
{
1518-
Symbols: []string{
1519-
"",
1520-
"job", "production/service_a", // 1, 2
1521-
"instance", "host1", // 3, 4
1522-
"machine_type", "n1-standard-1", // 5, 6
1523-
"cloud_provider", "gcp", // 7, 8
1524-
"region", "us-central1", // 9, 10
1525-
"__name__", "target_info", // 11, 12
1526-
},
1527-
Timeseries: []writev2.TimeSeries{
1528-
{
1529-
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
1530-
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12},
1531-
},
1532-
},
1533-
},
1534-
},
1535-
},
15361499
}
15371500

1501+
expectedIndex0 := func() pmetric.Metrics {
1502+
metrics := pmetric.NewMetrics()
1503+
rm := metrics.ResourceMetrics().AppendEmpty()
1504+
attrs := rm.Resource().Attributes()
1505+
attrs.PutStr("service.namespace", "production")
1506+
attrs.PutStr("service.name", "service_a")
1507+
attrs.PutStr("service.instance.id", "host1")
1508+
attrs.PutStr("machine_type", "n1-standard-1")
1509+
attrs.PutStr("cloud_provider", "gcp")
1510+
attrs.PutStr("region", "us-central1")
1511+
1512+
return metrics
1513+
}()
1514+
15381515
// Using the same expected metrics for both tests, because we are just checking if the order of the requests changes the result.
1539-
expectedMetrics := func() pmetric.Metrics {
1516+
expectedIndex1 := func() pmetric.Metrics {
15401517
metrics := pmetric.NewMetrics()
15411518
rm := metrics.ResourceMetrics().AppendEmpty()
15421519
attrs := rm.Resource().Attributes()
@@ -1592,7 +1569,10 @@ func TestTargetInfoWithMultipleRequests(t *testing.T) {
15921569
assert.Equal(t, http.StatusNoContent, resp.StatusCode, string(body))
15931570
}
15941571

1595-
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics, mockConsumer.metrics[0]))
1572+
// index 0 is related to the target_info metric that is the first request.
1573+
assert.NoError(t, pmetrictest.CompareMetrics(expectedIndex0, mockConsumer.metrics[0]))
1574+
// index 1 is related to the join between the target_info and the normal metric.
1575+
assert.NoError(t, pmetrictest.CompareMetrics(expectedIndex1, mockConsumer.metrics[1]))
15961576
})
15971577
}
15981578
}
@@ -1788,7 +1768,7 @@ func TestLRUCacheResourceMetrics(t *testing.T) {
17881768
}
17891769

17901770
// As target_info and metric1 have the same job/instance, they generate the same end metric: mockConsumer.metrics[0].
1791-
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1, mockConsumer.metrics[0]))
1771+
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics1, mockConsumer.metrics[1]))
17921772
// As metric2 have different job/instance, it generates a different end metric: mockConsumer.metrics[2]. At this point, the cache is full it should evict the target_info metric to store the metric2.
17931773
assert.NoError(t, pmetrictest.CompareMetrics(expectedMetrics2, mockConsumer.metrics[2]))
17941774
// As just have 1 slot in the cache, but the cache for metric1 was evicted, this metric1_1 should generate a new resource metric, even having the same job/instance than the metric1.
@@ -1819,3 +1799,103 @@ func buildMetaDataMapByID(ms pmetric.Metrics) map[string]map[string]any {
18191799
}
18201800
return result
18211801
}
1802+
1803+
// TestConcurrentRequestsforSameResourceAttributes asserts the receiver and its cache work even with concurrent requests
1804+
func TestConcurrentRequestsforSameResourceAttributes(t *testing.T) {
1805+
mockConsumer := &mockConsumer{}
1806+
prwReceiver := setupMetricsReceiver(t)
1807+
prwReceiver.nextConsumer = mockConsumer
1808+
1809+
ts := httptest.NewServer(http.HandlerFunc(prwReceiver.handlePRW))
1810+
defer ts.Close()
1811+
1812+
// Create multiple requests with the same job/instance labels (triggering cache key collision)
1813+
createRequest := func(metricName string, value float64, timestamp int64) *writev2.Request {
1814+
return &writev2.Request{
1815+
Symbols: []string{
1816+
"", // 0
1817+
"__name__", metricName, // 1, 2
1818+
"job", "test_job", // 3, 4
1819+
"instance", "test_instance", // 5, 6
1820+
},
1821+
Timeseries: []writev2.TimeSeries{
1822+
{
1823+
Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE},
1824+
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6},
1825+
Samples: []writev2.Sample{{Value: value, Timestamp: timestamp}},
1826+
},
1827+
},
1828+
}
1829+
}
1830+
1831+
requests := []*writev2.Request{}
1832+
for i := 0; i < 5; i++ {
1833+
requests = append(requests, createRequest("metric_"+strconv.Itoa(i+1), float64(i+1)*10, int64(i+1)*1000))
1834+
}
1835+
1836+
var wg sync.WaitGroup
1837+
var httpResults []int
1838+
var mu sync.Mutex
1839+
1840+
for _, req := range requests {
1841+
wg.Add(1)
1842+
go func(request *writev2.Request) {
1843+
defer wg.Done()
1844+
1845+
pBuf := proto.NewBuffer(nil)
1846+
err := pBuf.Marshal(request)
1847+
assert.NoError(t, err)
1848+
1849+
resp, err := http.Post(
1850+
ts.URL,
1851+
fmt.Sprintf("application/x-protobuf;proto=%s", promconfig.RemoteWriteProtoMsgV2),
1852+
bytes.NewBuffer(pBuf.Bytes()),
1853+
)
1854+
assert.NoError(t, err)
1855+
defer resp.Body.Close()
1856+
1857+
mu.Lock()
1858+
httpResults = append(httpResults, resp.StatusCode)
1859+
mu.Unlock()
1860+
}(req)
1861+
}
1862+
wg.Wait()
1863+
1864+
// Give some time for async processing
1865+
time.Sleep(100 * time.Millisecond)
1866+
1867+
mockConsumer.mu.Lock()
1868+
totalDataPoints := mockConsumer.dataPoints
1869+
mockConsumer.mu.Unlock()
1870+
1871+
// Verify all HTTP requests succeeded
1872+
for i, status := range httpResults {
1873+
assert.Equal(t, http.StatusNoContent, status, "Request %d should return 204", i+1)
1874+
}
1875+
1876+
// The expected behavior is:
1877+
// - All HTTP requests return 204 (success)
1878+
// - All 5 data points are present (no data loss)
1879+
// - We have 5 resource attributes, each with 1 data point. The resource attributes are equal since they have the same job/instance labels.
1880+
// - The cache should have a single resource attribute.
1881+
assert.Equal(t, 5, totalDataPoints)
1882+
assert.Equal(t, 1, prwReceiver.rmCache.Len())
1883+
1884+
// Verify thread safety: Check that metrics are properly consolidated without corruption
1885+
for i, metrics := range mockConsumer.metrics {
1886+
if metrics.DataPointCount() > 0 {
1887+
resourceMetrics := metrics.ResourceMetrics()
1888+
for j := 0; j < resourceMetrics.Len(); j++ {
1889+
rm := resourceMetrics.At(j)
1890+
scopeMetrics := rm.ScopeMetrics()
1891+
for k := 0; k < scopeMetrics.Len(); k++ {
1892+
scope := scopeMetrics.At(k)
1893+
metricsCount := scope.Metrics().Len()
1894+
if metricsCount != 1 {
1895+
t.Errorf("Batch %d: Found %d datapoints when it should be 1", i+1, metricsCount)
1896+
}
1897+
}
1898+
}
1899+
}
1900+
}
1901+
}

0 commit comments

Comments
 (0)