Skip to content

Commit c24ad37

Browse files
authored
plugin/decision: encoder return event same size as limit immediately (#7928)
In the encoder, if the event size equals the limit it was added to the buffer. Instead return it to avoid unnecessary downsize step that could lead to an infinite loop. Signed-off-by: Sebastian Spaink <[email protected]>
1 parent 162edff commit c24ad37

File tree

4 files changed

+39
-18
lines changed

4 files changed

+39
-18
lines changed

v1/plugins/logs/encoder.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (enc *chunkEncoder) Encode(event EventV1, eventBytes []byte) ([][]byte, err
135135
}
136136
}
137137

138-
if int64(len(eventBytes)+enc.bytesWritten+1) <= enc.uncompressedLimit {
138+
if int64(len(eventBytes)+enc.bytesWritten+1) < enc.uncompressedLimit {
139139
return nil, enc.appendEvent(eventBytes)
140140
}
141141

@@ -167,16 +167,12 @@ func (enc *chunkEncoder) Encode(event EventV1, eventBytes []byte) ([][]byte, err
167167
}
168168

169169
currentSize := len(result)
170-
if currentSize < int(enc.limit) {
170+
if currentSize <= int(enc.limit) {
171171
// success! the incoming chunk doesn't have to lose the ND cache and can go into a chunk by itself
172172
// scale up the uncompressed limit using the uncompressed event size as a base
173-
err = enc.appendEvent(eventBytes)
174-
if err != nil {
175-
return nil, err
176-
}
177173
enc.uncompressedLimit = int64(len(eventBytes))
178174
enc.scaleUp()
179-
return nil, nil
175+
return [][]byte{result}, nil
180176
}
181177

182178
// The ND cache has to be dropped, record this size as a known maximum event size
@@ -185,7 +181,6 @@ func (enc *chunkEncoder) Encode(event EventV1, eventBytes []byte) ([][]byte, err
185181
}
186182

187183
// 2. Drop the ND cache and see if the incoming event can fit within the current chunk without the cache (so we can maximize chunk size)
188-
enc.initialize()
189184
enc.incrMetric(encLogExUploadSizeLimitCounterName)
190185
// If there's no ND builtins cache in the event, then we don't need to retry encoding anything.
191186
if event.NDBuiltinCache == nil {

v1/plugins/logs/encoder_test.go

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func TestChunkEncoderSizeLimit(t *testing.T) {
281281
t.Fatal(err)
282282
}
283283
if len(chunks) != 1 {
284-
t.Errorf("Unexpected result: %v", result)
284+
t.Errorf("Unexpected chunk: %v", chunks)
285285
}
286286
// the incoming event doesn't fit, but the previous small event size is between 90-100% capacity so chunk is returned
287287
// the incoming event is too large and will be dropped
@@ -324,6 +324,31 @@ func TestChunkEncoderSizeLimit(t *testing.T) {
324324
if enc.metrics.Counter(logEncodingFailureCounterName).Value().(uint64) != 1 {
325325
t.Errorf("Expected one encoding failure but got: %v", enc.metrics.Counter(logEncodingFailureCounterName).Value().(uint64))
326326
}
327+
328+
// 179 is the size of the event compressed into a chunk by itself
329+
// the eventBytes size is 197 uncompressed, bigger than the limit
330+
// this tests that a chunk should be returned containing this single event
331+
// because after compression it adheres to the limit
332+
enc = newChunkEncoder(179).WithMetrics(metrics.New())
333+
chunks, err = enc.Encode(event, eventBytes)
334+
if err != nil {
335+
t.Fatal(err)
336+
}
337+
if len(chunks) != 1 {
338+
t.Errorf("Unexpected chunk: %v", chunks)
339+
}
340+
341+
// 198 is the size of eventBytes + 1, +1 accounting for the closing bracket
342+
// this tests that a chunk should be returned containing this single event
343+
// because after compression it adheres to the limit
344+
enc = newChunkEncoder(198).WithMetrics(metrics.New())
345+
chunks, err = enc.Encode(event, eventBytes)
346+
if err != nil {
347+
t.Fatal(err)
348+
}
349+
if len(chunks) != 1 {
350+
t.Errorf("Unexpected chunk: %v", chunks)
351+
}
327352
}
328353

329354
func TestChunkEncoderAdaptive(t *testing.T) {
@@ -348,7 +373,7 @@ func TestChunkEncoderAdaptive(t *testing.T) {
348373
expectedMaxEventsInChunk: 1,
349374
expectedScaleUpEvents: 1,
350375
expectedScaleDownEvents: 0,
351-
expectedEquiEvents: 999,
376+
expectedEquiEvents: 998,
352377
},
353378
{
354379
// 61 events can fit, but takes some guessing before it gets to the uncompressed limit 7200

v1/plugins/logs/eventBuffer.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,21 +127,22 @@ func (b *eventBuffer) Upload(ctx context.Context, client rest.Client, uploadPath
127127
}
128128

129129
for range eventLen {
130-
event := b.readEvent()
131-
if event == nil {
130+
bufItem := b.readBufItem()
131+
if bufItem == nil {
132132
break
133133
}
134134

135135
var result [][]byte
136-
if event.chunk != nil {
137-
result = [][]byte{event.chunk}
136+
if bufItem.chunk != nil {
137+
result = [][]byte{bufItem.chunk}
138138
} else {
139+
event := bufItem.EventV1
139140
eventBytes, err := json.Marshal(&event)
140141
if err != nil {
141142
return err
142143
}
143144

144-
result, err = b.enc.Encode(*event.EventV1, eventBytes)
145+
result, err = b.enc.Encode(*event, eventBytes)
145146
if err != nil {
146147
b.incrMetric(logEncodingFailureCounterName)
147148
if b.logger != nil {
@@ -189,7 +190,7 @@ func (b *eventBuffer) uploadChunks(ctx context.Context, result [][]byte, client
189190
}
190191

191192
// readEvent does a nonblocking read from the event buffer
192-
func (b *eventBuffer) readEvent() *bufferItem {
193+
func (b *eventBuffer) readBufItem() *bufferItem {
193194
select {
194195
case event := <-b.buffer:
195196
return event

v1/plugins/logs/eventBuffer_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ func TestEventBuffer_Upload(t *testing.T) {
143143
uploadSizeLimitBytes: 196, // Each test event is 195 bytes
144144
handleFunc: func(w http.ResponseWriter, r *http.Request) {
145145
events := decodeLogEvent(t, r.Body)
146-
if len(events) != 2 {
147-
t.Errorf("expected 2 events, got %d", len(events))
146+
if len(events) != 1 {
147+
t.Errorf("expected 1 events, got %d", len(events))
148148
}
149149
w.WriteHeader(http.StatusOK)
150150
},

0 commit comments

Comments
 (0)