Skip to content

Commit f7b6017

Browse files
authored
Don't free buffers after reading query stream (#9721)
Follow up on #9401 While the write path has been carefully crafted to make sure that we don't keep references to the strings backed by arrays with data from grpc stream (because it would cause a memory leak), this doesn't seem to be the case in the query path, where some tests started failing after we started to reuse those backing arrays through the usage of memory buffers implemented in the new gRPC library. This change reverts the buffer freeing, means that it won't be recycled (and will be garbage collected) to ensure data correctness, while we investigate where the data references are kept. Signed-off-by: Oleg Zaytsev <[email protected]>
1 parent 9370c58 commit f7b6017

File tree

1 file changed

+0
-10
lines changed

1 file changed

+0
-10
lines changed

pkg/distributor/query.go

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
269269
if len(resp.Timeseries) > 0 {
270270
for _, series := range resp.Timeseries {
271271
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
272-
resp.FreeBuffer()
273272
return ingesterQueryResult{}, limitErr
274273
}
275274
}
@@ -278,24 +277,20 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
278277
} else if len(resp.Chunkseries) > 0 {
279278
// Enforce the max chunks limits.
280279
if err := queryLimiter.AddChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
281-
resp.FreeBuffer()
282280
return ingesterQueryResult{}, err
283281
}
284282

285283
if err := queryLimiter.AddEstimatedChunks(ingester_client.ChunksCount(resp.Chunkseries)); err != nil {
286-
resp.FreeBuffer()
287284
return ingesterQueryResult{}, err
288285
}
289286

290287
for _, series := range resp.Chunkseries {
291288
if err := queryLimiter.AddSeries(series.Labels); err != nil {
292-
resp.FreeBuffer()
293289
return ingesterQueryResult{}, err
294290
}
295291
}
296292

297293
if err := queryLimiter.AddChunkBytes(ingester_client.ChunksSize(resp.Chunkseries)); err != nil {
298-
resp.FreeBuffer()
299294
return ingesterQueryResult{}, err
300295
}
301296

@@ -306,18 +301,15 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
306301

307302
for _, s := range resp.StreamingSeries {
308303
if err := queryLimiter.AddSeries(s.Labels); err != nil {
309-
resp.FreeBuffer()
310304
return ingesterQueryResult{}, err
311305
}
312306

313307
// We enforce the chunk count limit here, but enforce the chunk bytes limit while streaming the chunks themselves.
314308
if err := queryLimiter.AddChunks(int(s.ChunkCount)); err != nil {
315-
resp.FreeBuffer()
316309
return ingesterQueryResult{}, err
317310
}
318311

319312
if err := queryLimiter.AddEstimatedChunks(int(s.ChunkCount)); err != nil {
320-
resp.FreeBuffer()
321313
return ingesterQueryResult{}, err
322314
}
323315

@@ -327,8 +319,6 @@ func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSets [
327319
streamingSeriesBatches = append(streamingSeriesBatches, labelsBatch)
328320
}
329321

330-
resp.FreeBuffer()
331-
332322
if resp.IsEndOfSeriesStream {
333323
if streamingSeriesCount > 0 {
334324
result.streamingSeries.Series = make([]labels.Labels, 0, streamingSeriesCount)

0 commit comments

Comments
 (0)