Skip to content

Commit d79ea9f

Browse files
committed
Add custom CodecV2
Signed-off-by: Arve Knudsen <[email protected]>
1 parent b9a6e38 commit d79ea9f

File tree

5 files changed

+167
-19
lines changed

5 files changed

+167
-19
lines changed

pkg/ingester/client/buffering_client_test.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -265,24 +265,16 @@ func (ms *mockServer) Push(_ context.Context, r *mimirpb.WriteRequest) (*mimirpb
265265
ms.mu.Lock()
266266
defer ms.mu.Unlock()
267267

268-
d, err := r.Marshal()
269-
if err != nil {
270-
return nil, fmt.Errorf("marshal WriteRequest: %w", err)
271-
}
272-
var c mimirpb.WriteRequest
273-
if err := c.Unmarshal(d); err != nil {
274-
return nil, fmt.Errorf("unmarshal WriteRequest: %w", err)
275-
}
276-
// Clear unmarshal data, to ensure equality.
277-
c.ClearTimeseriesUnmarshalData()
278-
ms.reqs = append(ms.reqs, &c)
268+
// Clear unmarshal data. We don't need it and it breaks equality check in test.
269+
r.ClearTimeseriesUnmarshalData()
270+
ms.reqs = append(ms.reqs, r)
279271

280272
if ms.trackSamples {
281273
if ms.samplesPerSeries == nil {
282274
ms.samplesPerSeries = map[string][]mimirpb.Sample{}
283275
}
284276

285-
for _, ts := range c.Timeseries {
277+
for _, ts := range r.Timeseries {
286278
ser := mimirpb.FromLabelAdaptersToLabels(ts.Labels).String()
287279
ms.samplesPerSeries[ser] = append(ms.samplesPerSeries[ser], ts.Samples...)
288280
}

pkg/ingester/ingester.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ import (
5050
"go.uber.org/atomic"
5151
"golang.org/x/exp/slices"
5252
"golang.org/x/sync/errgroup"
53+
"google.golang.org/grpc/encoding"
54+
"google.golang.org/grpc/encoding/proto"
5355

5456
"github.com/grafana/mimir/pkg/ingester/activeseries"
5557
asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model"
@@ -3835,7 +3837,11 @@ func (i *Ingester) checkAvailableForPush() error {
38353837

38363838
// PushToStorage implements ingest.Pusher interface for ingestion via ingest-storage.
38373839
func (i *Ingester) PushToStorage(ctx context.Context, req *mimirpb.WriteRequest) error {
3838-
err := i.PushWithCleanup(ctx, req, func() { mimirpb.ReuseSlice(req.Timeseries) })
3840+
err := i.PushWithCleanup(ctx, req, func() {
3841+
codec := encoding.GetCodecV2(proto.Name).(*mimirpb.CodecV2)
3842+
codec.FreeWriteRequest(req)
3843+
mimirpb.ReuseSlice(req.Timeseries)
3844+
})
38393845
if err != nil {
38403846
return mapPushErrorToErrorWithStatus(err)
38413847
}

pkg/ingester/ingester_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ import (
5858
"golang.org/x/sync/errgroup"
5959
"google.golang.org/grpc"
6060
"google.golang.org/grpc/codes"
61+
"google.golang.org/grpc/encoding"
62+
"google.golang.org/grpc/encoding/proto"
6163

6264
asmodel "github.com/grafana/mimir/pkg/ingester/activeseries/model"
6365
"github.com/grafana/mimir/pkg/ingester/client"
@@ -3211,8 +3213,11 @@ func TestIngester_Push(t *testing.T) {
32113213

32123214
// Push timeseries
32133215
for idx, req := range testData.reqs {
3214-
// Push metrics to the ingester. Override the default cleanup method of mimirpb.ReuseSlice with a no-op one.
3215-
err := i.PushWithCleanup(ctx, req, func() {})
3216+
// Push metrics to the ingester.
3217+
err := i.PushWithCleanup(ctx, req, func() {
3218+
codec := encoding.GetCodecV2(proto.Name).(*mimirpb.CodecV2)
3219+
codec.FreeWriteRequest(req)
3220+
})
32163221

32173222
// We expect no error on any request except the last one
32183223
// which may error (and in that case we assert on it)

pkg/mimirpb/custom.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,156 @@ import (
66
"bytes"
77
"fmt"
88
"math"
9+
"sync"
10+
"unsafe"
911

1012
"github.com/prometheus/prometheus/model/histogram"
13+
"google.golang.org/grpc/encoding"
14+
"google.golang.org/grpc/encoding/proto"
15+
"google.golang.org/grpc/mem"
1116
)
1217

18+
func init() {
19+
c := encoding.GetCodecV2(proto.Name)
20+
encoding.RegisterCodecV2(&CodecV2{
21+
c: c,
22+
refs: map[uintptr]mem.BufferSlice{},
23+
})
24+
}
25+
26+
type CodecV2 struct {
27+
c encoding.CodecV2
28+
29+
mtx sync.Mutex
30+
refs map[uintptr]mem.BufferSlice
31+
}
32+
33+
func (*CodecV2) Name() string {
34+
return proto.Name
35+
}
36+
37+
func (c *CodecV2) Marshal(v any) (out mem.BufferSlice, err error) {
38+
return c.c.Marshal(v)
39+
}
40+
41+
func (c *CodecV2) Unmarshal(data mem.BufferSlice, v any) error {
42+
if err := c.c.Unmarshal(data, v); err != nil {
43+
return err
44+
}
45+
46+
switch fmt.Sprintf("%T", v) {
47+
case "*alertmanagerpb.ReadStateRequest":
48+
case "*alertmanagerpb.ReadStateResponse":
49+
case "*alertmanagerpb.UpdateStateResponse":
50+
case "*client.ActiveSeriesRequest":
51+
case "*client.ActiveSeriesResponse":
52+
case "*client.ExemplarQueryRequest":
53+
case "*client.ExemplarQueryResponse":
54+
case "*client.LabelNamesRequest":
55+
case "*client.LabelNamesResponse":
56+
case "*client.LabelNamesAndValuesRequest":
57+
case "*client.LabelNamesAndValuesResponse":
58+
case "*client.LabelValuesCardinalityRequest":
59+
case "*client.LabelValuesCardinalityResponse":
60+
case "*client.LabelValuesRequest":
61+
case "*client.LabelValuesResponse":
62+
case "*client.MetricsForLabelMatchersRequest":
63+
case "*client.MetricsForLabelMatchersResponse":
64+
case "*client.QueryRequest":
65+
case "*client.QueryStreamResponse":
66+
case "*client.UserStatsRequest":
67+
case "*client.UserStatsResponse":
68+
case "*clusterpb.Part":
69+
case "*etcdserverpb.DeleteRangeResponse":
70+
case "*etcdserverpb.RangeResponse":
71+
case "*etcdserverpb.TxnResponse":
72+
case "*etcdserverpb.WatchResponse":
73+
case "*emptypb.Empty":
74+
case "*frontendv1pb.ClientToFrontend":
75+
case "*frontendv1pb.FrontendToClient":
76+
case "*frontendv1pb.NotifyClientShutdownRequest":
77+
case "*frontendv1pb.NotifyClientShutdownResponse":
78+
case "*frontendv2pb.QueryResultRequest":
79+
case "*frontendv2pb.QueryResultResponse":
80+
case "*grpc_health_v1.HealthCheckRequest":
81+
case "*grpc_health_v1.HealthCheckResponse":
82+
case "*httpgrpc.HTTPRequest":
83+
case "*httpgrpc.HTTPResponse":
84+
case "*mimirpb.WriteRequest":
85+
case "*mimirpb.WriteResponse":
86+
case "*ruler.RulesRequest":
87+
case "*ruler.RulesResponse":
88+
case "*ruler.SyncRulesRequest":
89+
case "*ruler.SyncRulesResponse":
90+
case "*schedulerpb.FrontendToScheduler":
91+
case "*schedulerpb.NotifyQuerierShutdownRequest":
92+
case "*schedulerpb.NotifyQuerierShutdownResponse":
93+
case "*schedulerpb.QuerierToScheduler":
94+
case "*schedulerpb.SchedulerToFrontend":
95+
case "*schedulerpb.SchedulerToQuerier":
96+
case "*storepb.LabelNamesRequest":
97+
case "*storepb.LabelValuesRequest":
98+
case "*storepb.LabelValuesResponse":
99+
case "*storepb.SeriesRequest":
100+
case "*storepb.SeriesResponse":
101+
default:
102+
panic(fmt.Errorf("unrecognized protobuf message %T", v))
103+
}
104+
105+
/*
106+
switch x := v.(type) {
107+
case *WriteRequest:
108+
data.Ref()
109+
c.mtx.Lock()
110+
defer c.mtx.Unlock()
111+
c.refs[uintptr(unsafe.Pointer(x))] = data
112+
case *PreallocTimeseries:
113+
panic("unmarshaling PreallocTimeseries")
114+
case *LabelAdapter:
115+
panic("unmarshaling label adapter")
116+
}
117+
*/
118+
119+
return nil
120+
}
121+
122+
// FreeWriteRequest frees a previously unmarshaled WriteRequest.
123+
func (c *CodecV2) FreeWriteRequest(req *WriteRequest) {
124+
c.mtx.Lock()
125+
defer c.mtx.Unlock()
126+
127+
ptr := uintptr(unsafe.Pointer(&req))
128+
c.refs[ptr].Free()
129+
delete(c.refs, ptr)
130+
131+
for i, ts := range req.Timeseries {
132+
ptr := uintptr(unsafe.Pointer(&req.Timeseries[i]))
133+
c.refs[ptr].Free()
134+
delete(c.refs, ptr)
135+
136+
for j := range ts.Labels {
137+
ptr := uintptr(unsafe.Pointer(&ts.Labels[j]))
138+
c.refs[ptr].Free()
139+
delete(c.refs, ptr)
140+
}
141+
142+
for _, e := range ts.Exemplars {
143+
for j := range e.Labels {
144+
ptr := uintptr(unsafe.Pointer(&e.Labels[j]))
145+
c.refs[ptr].Free()
146+
delete(c.refs, ptr)
147+
}
148+
}
149+
}
150+
}
151+
152+
type UnmarshalerV2 interface {
153+
UnmarshalV2(encoding.CodecV2, mem.BufferSlice) error
154+
Free()
155+
}
156+
157+
// var _ UnmarshalerV2 = &WriteRequest{}
158+
13159
// MinTimestamp returns the minimum timestamp (milliseconds) among all series
14160
// in the WriteRequest. Returns math.MaxInt64 if the request is empty.
15161
func (m *WriteRequest) MinTimestamp() int64 {

pkg/mimirpb/timeseries.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,7 @@ var TimeseriesUnmarshalCachingEnabled = true
212212
// if p.skipUnmarshalingExemplars is false.
213213
func (p *PreallocTimeseries) Unmarshal(dAtA []byte) error {
214214
if TimeseriesUnmarshalCachingEnabled {
215-
p.marshalledData = make([]byte, len(dAtA))
216-
copy(p.marshalledData, dAtA)
215+
p.marshalledData = dAtA
217216
}
218217
p.TimeSeries = TimeseriesFromPool()
219218
p.TimeSeries.SkipUnmarshalingExemplars = p.skipUnmarshalingExemplars
@@ -351,7 +350,7 @@ func (bs *LabelAdapter) Unmarshal(dAtA []byte) error {
351350
if postIndex > l {
352351
return io.ErrUnexpectedEOF
353352
}
354-
bs.Name = string(dAtA[iNdEx:postIndex])
353+
bs.Name = yoloString(dAtA[iNdEx:postIndex])
355354
iNdEx = postIndex
356355
case 2:
357356
if wireType != 2 {
@@ -382,7 +381,7 @@ func (bs *LabelAdapter) Unmarshal(dAtA []byte) error {
382381
if postIndex > l {
383382
return io.ErrUnexpectedEOF
384383
}
385-
bs.Value = string(dAtA[iNdEx:postIndex])
384+
bs.Value = yoloString(dAtA[iNdEx:postIndex])
386385
iNdEx = postIndex
387386
default:
388387
iNdEx = preIndex

0 commit comments

Comments
 (0)