diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index c0158b4a5d2b4..b69f0ab9ff221 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -119,7 +119,7 @@ func (c Config) BuildWithSplitFunc(logger *zap.SugaredLogger, emit emit.Callback } // Ensure that splitter is buildable - factory := splitter.NewCustomFactory(splitFunc, c.FlushPeriod) + factory := splitter.NewCustomSplitFuncFactory(splitFunc, c.TrimConfig.Func(), c.FlushPeriod) if _, err := factory.SplitFunc(); err != nil { return nil, err } diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom.go b/pkg/stanza/fileconsumer/internal/splitter/custom.go index 52fe9125e627e..be25ddc2171e5 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom.go @@ -13,14 +13,16 @@ import ( type customFactory struct { splitFunc bufio.SplitFunc + trimFunc trim.Func flushPeriod time.Duration } var _ Factory = (*customFactory)(nil) -func NewCustomFactory(splitFunc bufio.SplitFunc, flushPeriod time.Duration) Factory { +func NewCustomSplitFuncFactory(splitFunc bufio.SplitFunc, trimFunc trim.Func, flushPeriod time.Duration) Factory { return &customFactory{ splitFunc: splitFunc, + trimFunc: trimFunc, flushPeriod: flushPeriod, } } diff --git a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go b/pkg/stanza/fileconsumer/internal/splitter/custom_test.go index 9e8390cc2a108..52772d4a79659 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/custom_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/custom_test.go @@ -9,27 +9,29 @@ import ( "time" "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" ) func TestCustomFactory(t *testing.T) { tests := []struct { name string - splitter bufio.SplitFunc + splitFunc bufio.SplitFunc flushPeriod time.Duration wantErr bool }{ { name: "default configuration", - splitter: func(data []byte, atEOF bool) (advance int, token []byte, err error) { + splitFunc: func(data []byte, atEOF bool) (advance int, token []byte, err error) { return len(data), data, nil }, - flushPeriod: 100 * time.Millisecond, + flushPeriod: 500 * time.Millisecond, wantErr: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - factory := NewCustomFactory(tt.splitter, tt.flushPeriod) + factory := NewCustomSplitFuncFactory(tt.splitFunc, trim.Whitespace, tt.flushPeriod) got, err := factory.SplitFunc() if (err != nil) != tt.wantErr { t.Errorf("Build() error = %v, wantErr %v", err, tt.wantErr) diff --git a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go index fc677667e371a..886e2d5ca3f76 100644 --- a/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go +++ b/pkg/stanza/fileconsumer/internal/splitter/multiline_test.go @@ -32,7 +32,7 @@ func TestSplitFuncFactory(t *testing.T) { wantErr: false, }, { - name: "split config error", + name: "split config error", splitConfig: split.Config{ LineStartPattern: "START", LineEndPattern: "END", @@ -45,7 +45,7 @@ func TestSplitFuncFactory(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - factory := NewSplitFuncFactory(tt.splitConfig, tt.encoding, tt.maxLogSize, trim.Nop, tt.flushPeriod) + factory := NewSplitFuncFactory(tt.splitConfig, tt.encoding, tt.maxLogSize, trim.Whitespace, tt.flushPeriod) got, err := factory.SplitFunc() if (err != nil) != tt.wantErr { t.Errorf("SplitFunc() error = %v, wantErr %v", err, tt.wantErr) diff --git a/pkg/stanza/operator/input/tcp/tcp.go b/pkg/stanza/operator/input/tcp/tcp.go index 2a618e7d10540..dab7457ac5052 100644 --- a/pkg/stanza/operator/input/tcp/tcp.go +++ b/pkg/stanza/operator/input/tcp/tcp.go @@ -81,9 +81,8 @@ type BaseConfig struct { type SplitFuncBuilder func(enc encoding.Encoding) (bufio.SplitFunc, error) -func (c Config) defaultMultilineBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) { - trimFunc := c.TrimConfig.Func() - splitFunc, err := c.SplitConfig.Func(enc, true, int(c.MaxLogSize), trimFunc) +func (c Config) defaultSplitFuncBuilder(enc encoding.Encoding) (bufio.SplitFunc, error) { + splitFunc, err := c.SplitConfig.Func(enc, true, int(c.MaxLogSize), c.TrimConfig.Func()) if err != nil { return nil, err } @@ -121,7 +120,7 @@ func (c Config) Build(logger *zap.SugaredLogger) (operator.Operator, error) { } if c.SplitFuncBuilder == nil { - c.SplitFuncBuilder = c.defaultMultilineBuilder + c.SplitFuncBuilder = c.defaultSplitFuncBuilder } // Build split func diff --git a/pkg/stanza/split/split_test.go b/pkg/stanza/split/split_test.go index 3727685538549..9022f81d23687 100644 --- a/pkg/stanza/split/split_test.go +++ b/pkg/stanza/split/split_test.go @@ -75,6 +75,13 @@ func TestConfigFunc(t *testing.T) { _, err := cfg.Func(unicode.UTF8, false, maxLogSize, trim.Nop) assert.EqualError(t, err, "compile line end regex: error parsing regexp: missing closing ]: `[`") }) + + t.Run("EncodeNewlineDstTooShort", func(t *testing.T) { + cfg := &Config{} + enc := errEncoding{} + _, err := cfg.Func(&enc, false, maxLogSize, trim.Nop) + assert.EqualError(t, err, "strange encoding") + }) } func TestLineStartSplitFunc(t *testing.T) { @@ -92,8 +99,8 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `LOGSTART \d+ `, Input: []byte(`LOGSTART 123 log1 LOGSTART 234 log2 LOGSTART 345 foo`), ExpectedTokens: []string{ - `LOGSTART 123 log1`, - `LOGSTART 234 log2`, + `LOGSTART 123 log1 `, + `LOGSTART 234 log2 `, }, }, { @@ -101,8 +108,8 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `^LOGSTART \d+ `, Input: []byte("LOGSTART 123 LOGSTART 345 log1\nLOGSTART 234 log2\nLOGSTART 345 foo"), ExpectedTokens: []string{ - "LOGSTART 123 LOGSTART 345 log1", - "LOGSTART 234 log2", + "LOGSTART 123 LOGSTART 345 log1\n", + "LOGSTART 234 log2\n", }, }, { @@ -115,7 +122,7 @@ func TestLineStartSplitFunc(t *testing.T) { Pattern: `LOGSTART \d+ `, Input: []byte(`part that doesn't match LOGSTART 123 part that matchesLOGSTART 123 foo`), ExpectedTokens: []string{ - `part that doesn't match`, + `part that doesn't match `, `LOGSTART 123 part that matches`, }, }, @@ -157,28 +164,24 @@ func TestLineStartSplitFunc(t *testing.T) { ExpectedError: errors.New("bufio.Scanner: token too long"), }, { - Name: "MultipleMultilineLogs", + Name: "MultiplesplitLogs", Pattern: `^LOGSTART \d+`, Input: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \nLOGSTART 17 log2\nLOGPART log2\nanother line\nLOGSTART 43 log5"), ExpectedTokens: []string{ - "LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1", - "LOGSTART 17 log2\nLOGPART log2\nanother line", + "LOGSTART 12 log1\t \nLOGPART log1\nLOGPART log1\t \n", + "LOGSTART 17 log2\nLOGPART log2\nanother line\n", }, }, { - Name: "LogsWithoutFlusher", + Name: "NoMatch", Pattern: `^LOGSTART \d+`, Input: []byte("LOGPART log1\nLOGPART log1\t \n"), }, } for _, tc := range testCases { - cfg := Config{LineStartPattern: tc.Pattern} - trimFunc := trim.Config{ - PreserveLeading: tc.PreserveLeadingWhitespaces, - PreserveTrailing: tc.PreserveTrailingWhitespaces, - }.Func() - splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trimFunc) + cfg := &Config{LineStartPattern: tc.Pattern} + splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trim.Nop) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } @@ -288,7 +291,7 @@ func TestLineEndSplitFunc(t *testing.T) { Input: []byte("log1 LOGEND LOGEND\nlog2 LOGEND\n"), ExpectedTokens: []string{ "log1 LOGEND LOGEND", - "log2 LOGEND", + "\nlog2 LOGEND", }, }, { @@ -339,16 +342,16 @@ func TestLineEndSplitFunc(t *testing.T) { ExpectedError: errors.New("bufio.Scanner: token too long"), }, { - Name: "MultipleMultilineLogs", + Name: "MultiplesplitLogs", Pattern: `^LOGEND.*$`, Input: []byte("LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1\t \nLOGSTART 17 log2\nLOGPART log2\nLOGEND log2\nLOGSTART 43 log5"), ExpectedTokens: []string{ - "LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1", - "LOGSTART 17 log2\nLOGPART log2\nLOGEND log2", + "LOGSTART 12 log1\t \nLOGPART log1\nLOGEND log1\t ", + "\nLOGSTART 17 log2\nLOGPART log2\nLOGEND log2", }, }, { - Name: "LogsWithoutFlusher", + Name: "NoMatch", Pattern: `^LOGEND.*$`, Input: []byte("LOGPART log1\nLOGPART log1\t \n"), }, @@ -356,12 +359,7 @@ func TestLineEndSplitFunc(t *testing.T) { for _, tc := range testCases { cfg := Config{LineEndPattern: tc.Pattern} - - trimFunc := trim.Config{ - PreserveLeading: tc.PreserveLeadingWhitespaces, - PreserveTrailing: tc.PreserveTrailingWhitespaces, - }.Func() - splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trimFunc) + splitFunc, err := cfg.Func(unicode.UTF8, false, 0, trim.Nop) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } @@ -484,7 +482,7 @@ func TestNewlineSplitFunc(t *testing.T) { Input: []byte("LOGPART log1"), }, { - Name: "DefaultFlusherSplits", + Name: "DefaultSplits", Input: []byte("log1\nlog2\n"), ExpectedTokens: []string{ "log1", @@ -499,42 +497,10 @@ func TestNewlineSplitFunc(t *testing.T) { "LOGEND 333", }, }, - { - Name: "PreserveLeadingWhitespaces", - Input: []byte("\n LOGEND 333 \nAnother one "), - ExpectedTokens: []string{ - "", - " LOGEND 333", - }, - PreserveLeadingWhitespaces: true, - }, - { - Name: "PreserveTrailingWhitespaces", - Input: []byte("\n LOGEND 333 \nAnother one "), - ExpectedTokens: []string{ - "", - "LOGEND 333 ", - }, - PreserveTrailingWhitespaces: true, - }, - { - Name: "PreserveBothLeadingAndTrailingWhitespaces", - Input: []byte("\n LOGEND 333 \nAnother one "), - ExpectedTokens: []string{ - "", - " LOGEND 333 ", - }, - PreserveLeadingWhitespaces: true, - PreserveTrailingWhitespaces: true, - }, } for _, tc := range testCases { - trimFunc := trim.Config{ - PreserveLeading: tc.PreserveLeadingWhitespaces, - PreserveTrailing: tc.PreserveTrailingWhitespaces, - }.Func() - splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, trimFunc) + splitFunc, err := NewlineSplitFunc(unicode.UTF8, false, trim.Nop) require.NoError(t, err) t.Run(tc.Name, tc.Run(splitFunc)) } @@ -623,6 +589,7 @@ func TestNoSplitFunc(t *testing.T) { "stuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmn", }, }, + { Name: "EOFBeforeMaxLogSize", Input: func() []byte { @@ -645,12 +612,10 @@ func TestNoSplitFunc(t *testing.T) { func TestNoopEncodingError(t *testing.T) { endCfg := Config{LineEndPattern: "\n"} - _, err := endCfg.Func(encoding.Nop, false, 0, trim.Nop) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) startCfg := Config{LineStartPattern: "\n"} - _, err = startCfg.Func(encoding.Nop, false, 0, trim.Nop) require.Equal(t, err, fmt.Errorf("line_start_pattern or line_end_pattern should not be set when using nop encoding")) } @@ -689,6 +654,7 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { {0, 108, 0, 111, 0, 103, 0, 50}, // log2 }, }, + { "MultiCarriageReturnUTF16", unicode.UTF16(unicode.BigEndian, unicode.IgnoreBOM), @@ -732,3 +698,26 @@ func TestNewlineSplitFunc_Encodings(t *testing.T) { }) } } + +// TODO the only error case is when an encoding requires more than 10 +// bytes to represent a newline or carriage return. This is very unlikely to happen +// and doesn't seem to warrant error handling in more than one place. +// Therefore, move error case into decoder.LookupEncoding? +type errEncoding struct{} +type errEncoder struct{} + +func (e errEncoding) NewEncoder() *encoding.Encoder { + return &encoding.Encoder{ + Transformer: &errEncoder{}, + } +} + +func (e errEncoding) NewDecoder() *encoding.Decoder { + return &encoding.Decoder{} +} + +func (e errEncoder) Transform(_, _ []byte, _ bool) (int, int, error) { + return 0, 0, errors.New("strange encoding") +} + +func (e errEncoder) Reset() {} diff --git a/pkg/stanza/split/splittest/splittest.go b/pkg/stanza/split/splittest/splittest.go index 1ea3ebc2c3caf..d2f7ee3dc8802 100644 --- a/pkg/stanza/split/splittest/splittest.go +++ b/pkg/stanza/split/splittest/splittest.go @@ -71,15 +71,13 @@ func (r *testReader) splitFunc(split bufio.SplitFunc) bufio.SplitFunc { } type TestCase struct { - Name string - Pattern string - Input []byte - ExpectedTokens []string - ExpectedError error - Sleep time.Duration - AdditionalIterations int - PreserveLeadingWhitespaces bool - PreserveTrailingWhitespaces bool + Name string + Pattern string + Input []byte + ExpectedTokens []string + ExpectedError error + Sleep time.Duration + AdditionalIterations int } func (tc TestCase) Run(split bufio.SplitFunc) func(t *testing.T) {