Skip to content

Commit accccf4

Browse files
aknuds1colega
andauthored
Upgrade to google.golang.org/grpc v1.66.2 / modify certain protobuf messages to retain their unmarshaling buffer (#9401)
* Upgrade to google.golang.org/grpc v1.66.2 --------- Signed-off-by: Arve Knudsen <[email protected]> Co-authored-by: Oleg Zaytsev <[email protected]>
1 parent fd1fe53 commit accccf4

File tree

96 files changed

+3989
-2501
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+3989
-2501
lines changed

go.mod

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ require (
4949
golang.org/x/net v0.30.0
5050
golang.org/x/sync v0.8.0
5151
golang.org/x/time v0.6.0
52-
google.golang.org/grpc v1.66.0
52+
google.golang.org/grpc v1.66.2
5353
gopkg.in/yaml.v2 v2.4.0
5454
gopkg.in/yaml.v3 v3.0.1
5555
)
@@ -316,7 +316,3 @@ replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-aler
316316
// - https://github.com/grafana/franz-go/pull/3
317317
// - https://github.com/grafana/franz-go/pull/4
318318
replace github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937
319-
320-
// Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions.
321-
// Following https://github.com/grafana/dskit/pull/581
322-
replace google.golang.org/grpc => google.golang.org/grpc v1.65.0

go.sum

Lines changed: 61 additions & 1121 deletions
Large diffs are not rendered by default.

pkg/distributor/distributor.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1384,6 +1384,7 @@ func NextOrCleanup(next PushFunc, pushReq *Request) (_ PushFunc, maybeCleanup fu
13841384
func (d *Distributor) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirpb.WriteResponse, error) {
13851385
pushReq := NewParsedRequest(req)
13861386
pushReq.AddCleanup(func() {
1387+
req.FreeBuffer()
13871388
mimirpb.ReuseSlice(req.Timeseries)
13881389
})
13891390

pkg/distributor/query.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
269269
if len(resp.Timeseries) > 0 {
270270
for _, series := range resp.Timeseries {
271271
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
272+
resp.FreeBuffer()
272273
return ingesterQueryResult{}, limitErr
273274
}
274275
}
@@ -277,20 +278,24 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
277278
} else if len(resp.Chunkseries) > 0 {
278279
// Enforce the max chunks limits.
279280
if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
281+
resp.FreeBuffer()
280282
return ingesterQueryResult{}, err
281283
}
282284

283285
if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
286+
resp.FreeBuffer()
284287
return ingesterQueryResult{}, err
285288
}
286289

287290
for _, series := range resp.Chunkseries {
288291
if err := queryLimiter.AddSeries(series.Labels); err != nil {
292+
resp.FreeBuffer()
289293
return ingesterQueryResult{}, err
290294
}
291295
}
292296

293297
if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil {
298+
resp.FreeBuffer()
294299
return ingesterQueryResult{}, err
295300
}
296301

@@ -301,15 +306,18 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
301306

302307
for _, s := range resp.StreamingSeries {
303308
if err := queryLimiter.AddSeries(s.Labels); err != nil {
309+
resp.FreeBuffer()
304310
return ingesterQueryResult{}, err
305311
}
306312

307313
// We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves.
308314
if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil {
315+
resp.FreeBuffer()
309316
return ingesterQueryResult{}, err
310317
}
311318

312319
if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil {
320+
resp.FreeBuffer()
313321
return ingesterQueryResult{}, err
314322
}
315323

@@ -319,6 +327,8 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
319327
streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)
320328
}
321329

330+
resp.FreeBuffer()
331+
322332
if resp.IsEndOfSeriesStream {
323333
if streamingSeriesCount > 0 {
324334
result.streamingSeries.Series = make([]labels.Labels, 0, streamingSeriesCount)

pkg/frontend/querymiddleware/model.pb.go

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
diff --git a/pkg/frontend/querymiddleware/model.pb.go b/pkg/frontend/querymiddleware/model.pb.go
2+
index 315ed4eed..47f80838c 100644
3+
--- a/pkg/frontend/querymiddleware/model.pb.go
4+
+++ b/pkg/frontend/querymiddleware/model.pb.go
5+
@@ -83,9 +83,6 @@ func (m *PrometheusHeader) GetValues() []string {
6+
}
7+
8+
type PrometheusResponse struct {
9+
- // Keep reference to buffer for unsafe references.
10+
- github_com_grafana_mimir_pkg_mimirpb.BufferHolder
11+
-
12+
Status string `protobuf:"bytes,1,opt,name=Status,proto3" json:"status"`
13+
Data *PrometheusData `protobuf:"bytes,2,opt,name=Data,proto3" json:"data,omitempty"`
14+
ErrorType string `protobuf:"bytes,3,opt,name=ErrorType,proto3" json:"errorType,omitempty"`

pkg/ingester/client/buffering_client_test.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ import (
1212
"time"
1313

1414
"github.com/gogo/protobuf/proto"
15+
"github.com/google/go-cmp/cmp"
16+
"github.com/google/go-cmp/cmp/cmpopts"
1517
"github.com/prometheus/prometheus/model/labels"
1618
"github.com/stretchr/testify/require"
1719
"google.golang.org/grpc"
@@ -68,7 +70,12 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) {
6870
}
6971

7072
reqs := serv.requests()
71-
require.Equal(t, requestsToSend, reqs)
73+
diff := cmp.Diff(requestsToSend, reqs, cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
74+
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmpopts.IgnoreUnexported(mimirpb.BufferHolder{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
75+
return a.TimeSeries.Equal(b.TimeSeries)
76+
}))
77+
}))
78+
require.Empty(t, diff)
7279
})
7380

7481
t.Run("push with pooling", func(t *testing.T) {
@@ -85,7 +92,12 @@ func TestWriteRequestBufferingClient_Push(t *testing.T) {
8592
}
8693

8794
reqs := serv.requests()
88-
require.Equal(t, requestsToSend, reqs)
95+
diff := cmp.Diff(requestsToSend, reqs, cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
96+
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmpopts.IgnoreUnexported(mimirpb.BufferHolder{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
97+
return a.TimeSeries.Equal(b.TimeSeries)
98+
}))
99+
}))
100+
require.Empty(t, diff)
89101

90102
// Verify that pool was used.
91103
require.Greater(t, pool.Gets.Load(), int64(0))
@@ -149,7 +161,12 @@ func TestWriteRequestBufferingClient_Push_WithMultipleMarshalCalls(t *testing.T)
149161
_, err := bufferingClient.Push(ctx, req)
150162
require.NoError(t, err)
151163

152-
require.Equal(t, serv.requests(), []*mimirpb.WriteRequest{req})
164+
diff := cmp.Diff([]*mimirpb.WriteRequest{req}, serv.requests(), cmp.Comparer(func(a, b *mimirpb.WriteRequest) bool {
165+
return cmp.Equal(*a, *b, cmpopts.IgnoreUnexported(mimirpb.WriteRequest{}), cmpopts.IgnoreUnexported(mimirpb.BufferHolder{}), cmp.Comparer(func(a, b mimirpb.PreallocTimeseries) bool {
166+
return a.TimeSeries.Equal(b.TimeSeries)
167+
}))
168+
}))
169+
require.Empty(t, diff)
153170

154171
// Verify that all buffers from the pool were returned.
155172
require.Greater(t, pool.Gets.Load(), int64(0))

pkg/ingester/client/ingester.pb.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
diff --git a/pkg/ingester/client/ingester.pb.go b/pkg/ingester/client/ingester.pb.go
2+
index 9398a5d80..bbefc14b1 100644
3+
--- a/pkg/ingester/client/ingester.pb.go
4+
+++ b/pkg/ingester/client/ingester.pb.go
5+
@@ -582,9 +582,6 @@ func (m *ActiveSeriesRequest) GetType() ActiveSeriesRequest_RequestType {
6+
}
7+
8+
type QueryResponse struct {
9+
- // Keep reference to buffer for unsafe references.
10+
- mimirpb.BufferHolder
11+
-
12+
Timeseries []mimirpb.TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
13+
}
14+
15+
@@ -636,9 +633,6 @@ func (m *QueryResponse) GetTimeseries() []mimirpb.TimeSeries {
16+
//
17+
// Only one of these two options will be populated.
18+
type QueryStreamResponse struct {
19+
- // Keep reference to buffer for unsafe references.
20+
- mimirpb.BufferHolder
21+
-
22+
Chunkseries []TimeSeriesChunk `protobuf:"bytes,1,rep,name=chunkseries,proto3" json:"chunkseries"`
23+
Timeseries []mimirpb.TimeSeries `protobuf:"bytes,2,rep,name=timeseries,proto3" json:"timeseries"`
24+
StreamingSeries []QueryStreamSeries `protobuf:"bytes,3,rep,name=streaming_series,json=streamingSeries,proto3" json:"streaming_series"`
25+
@@ -809,9 +803,6 @@ func (m *QueryStreamSeriesChunks) GetChunks() []Chunk {
26+
}
27+
28+
type ExemplarQueryResponse struct {
29+
- // Keep reference to buffer for unsafe references.
30+
- mimirpb.BufferHolder
31+
-
32+
Timeseries []mimirpb.TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"`
33+
}
34+
35+
@@ -1330,9 +1321,6 @@ func (m *MetricsForLabelMatchersRequest) GetMatchersSet() []*LabelMatchers {
36+
}
37+
38+
type MetricsForLabelMatchersResponse struct {
39+
- // Keep reference to buffer for unsafe references.
40+
- mimirpb.BufferHolder
41+
-
42+
Metric []*mimirpb.Metric `protobuf:"bytes,1,rep,name=metric,proto3" json:"metric,omitempty"`
43+
}
44+
45+
@@ -1478,9 +1466,6 @@ func (m *MetricsMetadataResponse) GetMetadata() []*mimirpb.MetricMetadata {
46+
}
47+
48+
type ActiveSeriesResponse struct {
49+
- // Keep reference to buffer for unsafe references.
50+
- mimirpb.BufferHolder
51+
-
52+
Metric []*mimirpb.Metric `protobuf:"bytes,1,rep,name=metric,proto3" json:"metric,omitempty"`
53+
// bucket_count is only used when the request type was NATIVE_HISTOGRAM_SERIES.
54+
// bucket_count contains the native histogram active buckets count for each series in "metric" above.

pkg/ingester/ingester.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3855,7 +3855,10 @@ func (i *Ingester) checkAvailableForPush() error {
38553855

38563856
// PushToStorage implements ingest.Pusher interface for ingestion via ingest-storage.
38573857
func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error {
3858-
err := i.PushWithCleanup(ctx, req, func() { mimirpb.ReuseSlice(req.Timeseries) })
3858+
err := i.PushWithCleanup(ctx, req, func() {
3859+
req.FreeBuffer()
3860+
mimirpb.ReuseSlice(req.Timeseries)
3861+
})
38593862
if err != nil {
38603863
return mapPushErrorToErrorWithStatus(err)
38613864
}

0 commit comments

Comments
 (0)