From 90aa8c1af32d925cffad99b57452826776c847e8 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Wed, 31 Jan 2024 15:08:05 -0600 Subject: [PATCH 1/4] [chore][pkg/stanza] Remove unnecessary parameter in recombine operator --- .../transformer/recombine/recombine.go | 20 +++++++------------ .../transformer/recombine/recombine_test.go | 3 +-- 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/pkg/stanza/operator/transformer/recombine/recombine.go b/pkg/stanza/operator/transformer/recombine/recombine.go index 3b0a13444813f..429ac1f9d03f0 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine.go +++ b/pkg/stanza/operator/transformer/recombine/recombine.go @@ -178,7 +178,7 @@ func (r *Transformer) flushLoop() { if timeSinceFirstEntry < r.forceFlushTimeout { continue } - if err := r.flushSource(context.Background(), source, true); err != nil { + if err := r.flushSource(context.Background(), source); err != nil { r.Errorf("there was error flushing combined logs %s", err) } } @@ -241,7 +241,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { // This is the first entry in the next batch case matches && r.matchIndicatesFirst(): // Flush the existing batch - err := r.flushSource(ctx, s, true) + err := r.flushSource(ctx, s) if err != nil { return err } @@ -252,7 +252,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { // This is the last entry in a complete batch case matches && r.matchIndicatesLast(): r.addToBatch(ctx, e, s) - return r.flushSource(ctx, s, true) + return r.flushSource(ctx, s) } // This is neither the first entry of a new log, @@ -302,7 +302,7 @@ func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str batch.recombined.WriteString(s) if (r.maxLogSize > 0 && int64(batch.recombined.Len()) > r.maxLogSize) || len(batch.entries) >= r.maxBatchSize { - if err := r.flushSource(ctx, source, false); err != nil { + if err := r.flushSource(ctx, source); err != nil { r.Errorf("there was error flushing combined logs %s", err) } } @@ -326,7 +326,7 @@ func (r *Transformer) flushUncombined(ctx context.Context) { func (r *Transformer) flushAllSources(ctx context.Context) { var errs []error for source := range r.batchMap { - if err := r.flushSource(ctx, source, true); err != nil { + if err := r.flushSource(ctx, source); err != nil { errs = append(errs, err) } } @@ -337,7 +337,7 @@ func (r *Transformer) flushAllSources(ctx context.Context) { // flushSource combines the entries currently in the batch into a single entry, // then forwards them to the next operator in the pipeline -func (r *Transformer) flushSource(ctx context.Context, source string, deleteSource bool) error { +func (r *Transformer) flushSource(ctx context.Context, source string) error { batch := r.batchMap[source] // Skip flushing a combined log if the batch is empty if batch == nil { @@ -366,13 +366,7 @@ func (r *Transformer) flushSource(ctx context.Context, source string, deleteSour } r.Write(ctx, base) - if deleteSource { - r.removeBatch(source) - } else { - batch.entries = batch.entries[:0] - batch.recombined.Reset() - } - + r.removeBatch(source) return nil } diff --git a/pkg/stanza/operator/transformer/recombine/recombine_test.go b/pkg/stanza/operator/transformer/recombine/recombine_test.go index 90164ee4276c0..34e1ee29cc443 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine_test.go +++ b/pkg/stanza/operator/transformer/recombine/recombine_test.go @@ -756,9 +756,8 @@ func TestSourceBatchDelete(t *testing.T) { ctx := context.Background() require.NoError(t, recombine.Process(ctx, start)) - require.NoError(t, recombine.Process(ctx, next)) require.Equal(t, 1, len(recombine.batchMap)) - require.NoError(t, recombine.flushSource(ctx, "file1", true)) + require.NoError(t, recombine.Process(ctx, next)) require.Equal(t, 0, len(recombine.batchMap)) fake.ExpectEntry(t, expect) require.NoError(t, recombine.Stop()) From cadd83109dc1ee7f957065db95d54094e07c3146 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 1 Feb 2024 10:36:09 -0600 Subject: [PATCH 2/4] [pkg/stanza] Remove flushUncombined from recombine operator --- .../transformer/recombine/recombine.go | 20 +++---------------- .../transformer/recombine/recombine_test.go | 19 ++++++++++++------ 2 files changed, 16 insertions(+), 23 deletions(-) diff --git a/pkg/stanza/operator/transformer/recombine/recombine.go b/pkg/stanza/operator/transformer/recombine/recombine.go index 429ac1f9d03f0..411f4463b404d 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine.go +++ b/pkg/stanza/operator/transformer/recombine/recombine.go @@ -273,12 +273,11 @@ func (r *Transformer) matchIndicatesLast() bool { func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source string) { batch, ok := r.batchMap[source] if !ok { - batch = r.addNewBatch(source, e) if len(r.batchMap) >= r.maxSources { - r.Error("Batched source exceeds max source size. Flushing all batched logs. Consider increasing max_sources parameter") - r.flushUncombined(context.Background()) - return + r.Error("Too many sources. Flushing all batched logs. Consider increasing max_sources parameter") + r.flushAllSources(ctx) } + batch = r.addNewBatch(source, e) } else { // If the length of the batch is 0, this batch was flushed previously due to triggering size limit. // In this case, the firstEntryObservedTime should be updated to reset the timeout @@ -309,19 +308,6 @@ func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str } -// flushUncombined flushes all the logs in the batch individually to the -// next output in the pipeline. This is only used when there is an error -// or at shutdown to avoid dropping the logs. -func (r *Transformer) flushUncombined(ctx context.Context) { - for source := range r.batchMap { - for _, entry := range r.batchMap[source].entries { - r.Write(ctx, entry) - } - r.removeBatch(source) - } - r.ticker.Reset(r.forceFlushTimeout) -} - // flushAllSources flushes all sources. func (r *Transformer) flushAllSources(ctx context.Context) { var errs []error diff --git a/pkg/stanza/operator/transformer/recombine/recombine_test.go b/pkg/stanza/operator/transformer/recombine/recombine_test.go index 34e1ee29cc443..fffa50ce4c89d 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine_test.go +++ b/pkg/stanza/operator/transformer/recombine/recombine_test.go @@ -381,15 +381,22 @@ func TestTransformer(t *testing.T) { cfg.IsLastEntry = "body == 'end'" cfg.OutputIDs = []string{"fake"} cfg.MaxSources = 1 + cfg.OverwriteWith = "oldest" + cfg.ForceFlushTimeout = 10 * time.Millisecond return cfg }(), []*entry.Entry{ - entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}), - entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1, "start1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t1.Add(10*time.Millisecond), "middle1", map[string]string{"file.path": "file1"}), + entryWithBodyAttr(t2, "start2", map[string]string{"file.path": "file2"}), + entryWithBodyAttr(t2.Add(10*time.Millisecond), "middle2", map[string]string{"file.path": "file2"}), + entryWithBodyAttr(t2.Add(20*time.Millisecond), "end2", map[string]string{"file.path": "file2"}), }, []*entry.Entry{ - entryWithBodyAttr(t1, "file1", map[string]string{"file.path": "file1"}), - entryWithBodyAttr(t2, "end", map[string]string{"file.path": "file1"}), + // First entry is booted before end comes in, but partial recombination should occur + entryWithBodyAttr(t1.Add(10*time.Millisecond), "start1\nmiddle1", map[string]string{"file.path": "file1"}), + // Second entry is flushed automatically when end comes in + entryWithBodyAttr(t2.Add(20*time.Millisecond), "start2\nmiddle2\nend2", map[string]string{"file.path": "file2"}), }, }, { @@ -591,7 +598,7 @@ func BenchmarkRecombine(b *testing.B) { for _, e := range entries { require.NoError(b, recombine.Process(ctx, e)) } - recombine.flushUncombined(ctx) + recombine.flushAllSources(ctx) } } @@ -630,7 +637,7 @@ func BenchmarkRecombineLimitTrigger(b *testing.B) { require.NoError(b, recombine.Process(ctx, next)) require.NoError(b, recombine.Process(ctx, start)) require.NoError(b, recombine.Process(ctx, next)) - recombine.flushUncombined(ctx) + recombine.flushAllSources(ctx) } } From e45a5c2c477558c847bf6899542c1a6fc8d13163 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 1 Feb 2024 10:51:10 -0600 Subject: [PATCH 3/4] [chore][pkg/stanza] Remove unnecessary slice of entries --- .../transformer/recombine/recombine.go | 33 +++++++------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/pkg/stanza/operator/transformer/recombine/recombine.go b/pkg/stanza/operator/transformer/recombine/recombine.go index 411f4463b404d..7580f8c870554 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine.go +++ b/pkg/stanza/operator/transformer/recombine/recombine.go @@ -117,7 +117,6 @@ func (c *Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { batchPool: sync.Pool{ New: func() any { return &sourceBatch{ - entries: []*entry.Entry{}, recombined: &bytes.Buffer{}, } }, @@ -156,7 +155,8 @@ type Transformer struct { // sourceBatch contains the status info of a batch type sourceBatch struct { - entries []*entry.Entry + baseEntry *entry.Entry + numEntries int recombined *bytes.Buffer firstEntryObservedTime time.Time } @@ -279,12 +279,10 @@ func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str } batch = r.addNewBatch(source, e) } else { - // If the length of the batch is 0, this batch was flushed previously due to triggering size limit. - // In this case, the firstEntryObservedTime should be updated to reset the timeout - if len(batch.entries) == 0 { - batch.firstEntryObservedTime = e.ObservedTimestamp + batch.numEntries++ + if r.overwriteWithOldest { + batch.baseEntry = e } - batch.entries = append(batch.entries, e) } // Combine the combineField of each entry in the batch, @@ -300,7 +298,7 @@ func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str } batch.recombined.WriteString(s) - if (r.maxLogSize > 0 && int64(batch.recombined.Len()) > r.maxLogSize) || len(batch.entries) >= r.maxBatchSize { + if (r.maxLogSize > 0 && int64(batch.recombined.Len()) > r.maxLogSize) || batch.numEntries >= r.maxBatchSize { if err := r.flushSource(ctx, source); err != nil { r.Errorf("there was error flushing combined logs %s", err) } @@ -330,28 +328,18 @@ func (r *Transformer) flushSource(ctx context.Context, source string) error { return nil } - if len(batch.entries) == 0 { + if batch.baseEntry == nil { r.removeBatch(source) return nil } - // Choose which entry we want to keep the rest of the fields from - var base *entry.Entry - entries := batch.entries - - if r.overwriteWithOldest { - base = entries[len(entries)-1] - } else { - base = entries[0] - } - // Set the recombined field on the entry - err := base.Set(r.combineField, batch.recombined.String()) + err := batch.baseEntry.Set(r.combineField, batch.recombined.String()) if err != nil { return err } - r.Write(ctx, base) + r.Write(ctx, batch.baseEntry) r.removeBatch(source) return nil } @@ -359,7 +347,8 @@ func (r *Transformer) flushSource(ctx context.Context, source string) error { // addNewBatch creates a new batch for the given source and adds the entry to it. func (r *Transformer) addNewBatch(source string, e *entry.Entry) *sourceBatch { batch := r.batchPool.Get().(*sourceBatch) - batch.entries = append(batch.entries[:0], e) + batch.baseEntry = e + batch.numEntries = 1 batch.recombined.Reset() batch.firstEntryObservedTime = e.ObservedTimestamp r.batchMap[source] = batch From 7ae909496cd80442925c696926b9801dc586d3c2 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 25 Jan 2024 14:36:27 -0600 Subject: [PATCH 4/4] [pkg/stanza] Fix recombine issues --- .../operator/transformer/recombine/recombine.go | 17 +++-------------- .../transformer/recombine/recombine_test.go | 16 ++++++++-------- 2 files changed, 11 insertions(+), 22 deletions(-) diff --git a/pkg/stanza/operator/transformer/recombine/recombine.go b/pkg/stanza/operator/transformer/recombine/recombine.go index 7580f8c870554..70eb32c2471ec 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine.go +++ b/pkg/stanza/operator/transformer/recombine/recombine.go @@ -201,7 +201,6 @@ func (r *Transformer) Stop() error { r.flushAllSources(ctx) close(r.chClose) - return nil } @@ -239,10 +238,9 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { switch { // This is the first entry in the next batch - case matches && r.matchIndicatesFirst(): + case matches && r.matchFirstLine: // Flush the existing batch - err := r.flushSource(ctx, s) - if err != nil { + if err := r.flushSource(ctx, s); err != nil { return err } @@ -250,7 +248,7 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { r.addToBatch(ctx, e, s) return nil // This is the last entry in a complete batch - case matches && r.matchIndicatesLast(): + case matches && !r.matchFirstLine: r.addToBatch(ctx, e, s) return r.flushSource(ctx, s) } @@ -261,14 +259,6 @@ func (r *Transformer) Process(ctx context.Context, e *entry.Entry) error { return nil } -func (r *Transformer) matchIndicatesFirst() bool { - return r.matchFirstLine -} - -func (r *Transformer) matchIndicatesLast() bool { - return !r.matchFirstLine -} - // addToBatch adds the current entry to the current batch of entries that will be combined func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source string) { batch, ok := r.batchMap[source] @@ -303,7 +293,6 @@ func (r *Transformer) addToBatch(ctx context.Context, e *entry.Entry, source str r.Errorf("there was error flushing combined logs %s", err) } } - } // flushAllSources flushes all sources. diff --git a/pkg/stanza/operator/transformer/recombine/recombine_test.go b/pkg/stanza/operator/transformer/recombine/recombine_test.go index fffa50ce4c89d..d80050f7412e9 100644 --- a/pkg/stanza/operator/transformer/recombine/recombine_test.go +++ b/pkg/stanza/operator/transformer/recombine/recombine_test.go @@ -158,7 +158,7 @@ func TestTransformer(t *testing.T) { cfg.IsFirstEntry = "$body == 'test1'" cfg.OutputIDs = []string{"fake"} cfg.OverwriteWith = "newest" - cfg.ForceFlushTimeout = 100 * time.Millisecond + cfg.ForceFlushTimeout = 10 * time.Millisecond return cfg }(), []*entry.Entry{ @@ -178,7 +178,7 @@ func TestTransformer(t *testing.T) { cfg.IsFirstEntry = "body == 'start'" cfg.OutputIDs = []string{"fake"} cfg.OverwriteWith = "oldest" - cfg.ForceFlushTimeout = 100 * time.Millisecond + cfg.ForceFlushTimeout = 10 * time.Millisecond return cfg }(), []*entry.Entry{ @@ -219,8 +219,8 @@ func TestTransformer(t *testing.T) { cfg := NewConfig() cfg.CombineField = entry.NewBodyField() cfg.IsFirstEntry = `body matches "^[^\\s]"` - cfg.ForceFlushTimeout = 100 * time.Millisecond cfg.OutputIDs = []string{"fake"} + cfg.ForceFlushTimeout = 10 * time.Millisecond return cfg }(), []*entry.Entry{ @@ -252,8 +252,8 @@ func TestTransformer(t *testing.T) { cfg := NewConfig() cfg.CombineField = entry.NewBodyField("message") cfg.IsFirstEntry = `body.message matches "^[^\\s]"` - cfg.ForceFlushTimeout = 100 * time.Millisecond cfg.OutputIDs = []string{"fake"} + cfg.ForceFlushTimeout = 10 * time.Millisecond return cfg }(), []*entry.Entry{ @@ -287,7 +287,6 @@ func TestTransformer(t *testing.T) { cfg.CombineWith = "" cfg.IsLastEntry = "body.logtag == 'F'" cfg.OverwriteWith = "oldest" - cfg.ForceFlushTimeout = 100 * time.Millisecond cfg.OutputIDs = []string{"fake"} return cfg }(), @@ -501,17 +500,18 @@ func TestTransformer(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() op, err := tc.config.Build(testutil.Logger(t)) require.NoError(t, err) require.NoError(t, op.Start(testutil.NewUnscopedMockPersister())) - recombine := op.(*Transformer) + r := op.(*Transformer) fake := testutil.NewFakeOutput(t) - err = recombine.SetOutputs([]operator.Operator{fake}) + err = r.SetOutputs([]operator.Operator{fake}) require.NoError(t, err) for _, e := range tc.input { - require.NoError(t, recombine.Process(context.Background(), e)) + require.NoError(t, r.Process(ctx, e)) } fake.ExpectEntries(t, tc.expectedOutput)