From 3a20261b718d8c84cd0e707c33b292d45565596d Mon Sep 17 00:00:00 2001 From: Michael Graff Date: Fri, 11 Apr 2025 01:08:56 -0500 Subject: [PATCH 01/14] detect snappy compression type --- config/confighttp/compression.go | 44 +++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index 62638e0c7be..0551e2fb816 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -6,6 +6,7 @@ package confighttp // import "go.opentelemetry.io/collector/config/confighttp" import ( + "bufio" "bytes" "compress/gzip" "compress/zlib" @@ -27,6 +28,46 @@ type compressRoundTripper struct { compressor *compressor } +// snappyFramingHeader is always the first 10 bytes of a snappy framed stream. +var snappyFramingHeader = []byte{ + 0xff, 0x06, 0x00, 0x00, + 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59, // "sNaPpY" +} + +// snappyHandler returns an io.ReadCloser that auto-detects the snappy format. +// This is necessary because the collector previously used "content-encoding: snappy" +// but decompressed and compressed the payloads using the snappy framing format. +// However, "content-encoding: snappy" is uses the block format, and "x-snappy-framed" +// is the framing format. This handler is a (hopefully temporary) hack to +// make this work in a backwards-compatible way. +func snappyHandler(body io.ReadCloser) (io.ReadCloser, error) { + br := bufio.NewReader(body) + + peekBytes, err := br.Peek(len(snappyFramingHeader)) + if err != nil && err != io.EOF { + return nil, err + } + + isFramed := len(peekBytes) >= len(snappyFramingHeader) && bytes.Equal(peekBytes[:len(snappyFramingHeader)], snappyFramingHeader) + + if isFramed { + return &compressReadCloser{ + Reader: snappy.NewReader(br), + orig: body, + }, nil + } else { + compressed, err := io.ReadAll(br) + if err != nil { + return nil, err + } + decoded, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + return io.NopCloser(bytes.NewReader(decoded)), nil + } +} + var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){ "": func(io.ReadCloser) (io.ReadCloser, error) { // Not a compressed payload. Nothing to do. @@ -60,8 +101,9 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro } return zr, nil }, + "snappy": snappyHandler, //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature - "snappy": func(body io.ReadCloser) (io.ReadCloser, error) { + "x-snappy-framed": func(body io.ReadCloser) (io.ReadCloser, error) { // Lazy Reading content to improve memory efficiency return &compressReadCloser{ Reader: snappy.NewReader(body), From 95b9eee7c9c82a117ff220145b9a4aa7fdcf90d8 Mon Sep 17 00:00:00 2001 From: Michael Graff Date: Fri, 11 Apr 2025 01:38:34 -0500 Subject: [PATCH 02/14] fully add x-snappy-framed --- config/configcompression/compressiontype.go | 2 + .../configcompression/compressiontype_test.go | 17 +++++++ config/confighttp/compression_test.go | 46 +++++++++++++++++-- config/confighttp/compressor.go | 40 +++++++++++++++- config/confighttp/confighttp.go | 2 +- 5 files changed, 102 insertions(+), 5 deletions(-) diff --git a/config/configcompression/compressiontype.go b/config/configcompression/compressiontype.go index f8a7a957f70..74a6b4efa72 100644 --- a/config/configcompression/compressiontype.go +++ b/config/configcompression/compressiontype.go @@ -24,6 +24,7 @@ const ( TypeZlib Type = "zlib" TypeDeflate Type = "deflate" TypeSnappy Type = "snappy" + TypeSnappyFramed Type = "x-snappy-framed" TypeZstd Type = "zstd" TypeLz4 Type = "lz4" typeNone Type = "none" @@ -43,6 +44,7 @@ func (ct *Type) UnmarshalText(in []byte) error { typ == TypeZlib || typ == TypeDeflate || typ == TypeSnappy || + typ == TypeSnappyFramed || typ == TypeZstd || typ == TypeLz4 || typ == typeNone || diff --git a/config/configcompression/compressiontype_test.go b/config/configcompression/compressiontype_test.go index 2fb9976eb4e..8225c880639 100644 --- a/config/configcompression/compressiontype_test.go +++ b/config/configcompression/compressiontype_test.go @@ -42,6 +42,12 @@ func TestUnmarshalText(t *testing.T) { shouldError: false, isCompressed: true, }, + { + name: "ValidSnappyFramed", + compressionName: []byte("x-snappy-framed"), + shouldError: false, + isCompressed: true, + }, { name: "ValidZstd", compressionName: []byte("zstd"), @@ -128,6 +134,17 @@ func TestValidateParams(t *testing.T) { compressionLevel: 1, shouldError: true, }, + { + name: "ValidSnappyFramed", + compressionName: []byte("x-snappy-framed"), + shouldError: false, + }, + { + name: "InvalidSnappyFramed", + compressionName: []byte("x-snappy-framed"), + compressionLevel: 1, + shouldError: true, + }, { name: "ValidZstd", compressionName: []byte("zstd"), diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index 4c41e8085ff..e31fb3ceab7 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -32,6 +32,7 @@ func TestHTTPClientCompression(t *testing.T) { compressedGzipBody := compressGzip(t, testBody) compressedZlibBody := compressZlib(t, testBody) compressedDeflateBody := compressZlib(t, testBody) + compressedSnappyFramedBody := compressSnappyFramed(t, testBody) compressedSnappyBody := compressSnappy(t, testBody) compressedZstdBody := compressZstd(t, testBody) compressedLz4Body := compressLz4(t, testBody) @@ -111,6 +112,19 @@ func TestHTTPClientCompression(t *testing.T) { reqBody: compressedSnappyBody.Bytes(), shouldError: true, }, + { + name: "ValidSnappyFramed", + encoding: configcompression.TypeSnappyFramed, + reqBody: compressedSnappyFramedBody.Bytes(), + shouldError: false, + }, + { + name: "InvalidSnappyFramed", + encoding: configcompression.TypeSnappyFramed, + level: gzip.DefaultCompression, + reqBody: compressedSnappyFramedBody.Bytes(), + shouldError: true, + }, { name: "ValidZstd", encoding: configcompression.TypeZstd, @@ -250,6 +264,12 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { reqBody: compressZstd(t, testBody), respCode: http.StatusOK, }, + { + name: "ValidSnappyFramed", + encoding: "x-snappy-framed", + reqBody: compressSnappyFramed(t, testBody), + respCode: http.StatusOK, + }, { name: "ValidSnappy", encoding: "snappy", @@ -290,12 +310,19 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { respCode: http.StatusBadRequest, respBody: "invalid input: magic number mismatch", }, + { + name: "InvalidSnappyFramed", + encoding: "x-snappy-framed", + reqBody: bytes.NewBuffer(testBody), + respCode: http.StatusBadRequest, + respBody: "snappy: corrupt input", + }, { name: "InvalidSnappy", encoding: "snappy", reqBody: bytes.NewBuffer(testBody), respCode: http.StatusBadRequest, - respBody: "snappy: corrupt input", + respBody: "snappy: corrupt input\n", }, { name: "UnsupportedCompression", @@ -415,7 +442,7 @@ func TestOverrideCompressionList(t *testing.T) { }), defaultMaxRequestBodySize, defaultErrorHandler, configuredDecoders, nil)) t.Cleanup(srv.Close) - req, err := http.NewRequest(http.MethodGet, srv.URL, compressSnappy(t, []byte("123decompressed body"))) + req, err := http.NewRequest(http.MethodGet, srv.URL, compressSnappyFramed(t, []byte("123decompressed body"))) require.NoError(t, err, "failed to create request to test handler") req.Header.Set("Content-Encoding", "snappy") @@ -456,6 +483,11 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) { encoding: "zlib", compress: compressZlib, }, + { + name: "x-snappy-framed", + encoding: "x-snappy-framed", + compress: compressSnappyFramed, + }, { name: "snappy", encoding: "snappy", @@ -517,7 +549,7 @@ func compressZlib(tb testing.TB, body []byte) *bytes.Buffer { return &buf } -func compressSnappy(tb testing.TB, body []byte) *bytes.Buffer { +func compressSnappyFramed(tb testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer sw := snappy.NewBufferedWriter(&buf) _, err := sw.Write(body) @@ -526,6 +558,14 @@ func compressSnappy(tb testing.TB, body []byte) *bytes.Buffer { return &buf } +func compressSnappy(tb testing.TB, body []byte) *bytes.Buffer { + var buf bytes.Buffer + compressed := snappy.Encode(nil, body) + _, err := buf.Write(compressed) + require.NoError(tb, err) + return &buf +} + func compressZstd(tb testing.TB, body []byte) *bytes.Buffer { var buf bytes.Buffer compression := zstd.SpeedFastest diff --git a/config/confighttp/compressor.go b/config/confighttp/compressor.go index 750bda5795d..5cc9e6f681f 100644 --- a/config/confighttp/compressor.go +++ b/config/confighttp/compressor.go @@ -66,10 +66,14 @@ func newWriteCloserResetFunc(compressionType configcompression.Type, compression w, _ := gzip.NewWriterLevel(nil, int(compressionParams.Level)) return w }, nil - case configcompression.TypeSnappy: + case configcompression.TypeSnappyFramed: return func() writeCloserReset { return snappy.NewBufferedWriter(nil) }, nil + case configcompression.TypeSnappy: + return func() writeCloserReset { + return &rawSnappyWriter{} + }, nil case configcompression.TypeZstd: level := zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(int(compressionParams.Level))) return func() writeCloserReset { @@ -111,3 +115,37 @@ func (p *compressor) compress(buf *bytes.Buffer, body io.ReadCloser) error { return writer.Close() } + +// rawSnappyWriter buffers all writes and, on Close, +// compresses the data as a raw snappy block (non-framed) +// and writes the compressed bytes to the underlying writer. +type rawSnappyWriter struct { + buffer bytes.Buffer + w io.Writer + closed bool +} + +// Write buffers the data. +func (w *rawSnappyWriter) Write(p []byte) (int, error) { + return w.buffer.Write(p) +} + +// Close compresses the buffered data in one shot using snappy.Encode, +// writes the compressed block to the underlying writer, and marks the writer as closed. +func (w *rawSnappyWriter) Close() error { + if w.closed { + return nil + } + w.closed = true + // Compress the buffered uncompressed bytes. + compressed := snappy.Encode(nil, w.buffer.Bytes()) + _, err := w.w.Write(compressed) + return err +} + +// Reset sets a new underlying writer, resets the buffer and the closed flag. +func (w *rawSnappyWriter) Reset(newWriter io.Writer) { + w.buffer.Reset() + w.w = newWriter + w.closed = false +} diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 31735c9e9a3..51eec74fe2e 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -38,7 +38,7 @@ const ( defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB ) -var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4"} +var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4", "x-snappy-framed"} // ClientConfig defines settings for creating an HTTP client. type ClientConfig struct { From 00b1875216e27635a2b811173dd489ef16466cc5 Mon Sep 17 00:00:00 2001 From: Michael Graff Date: Fri, 11 Apr 2025 01:52:38 -0500 Subject: [PATCH 03/14] add test for snappy-but-framed-body --- config/confighttp/compression_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index e31fb3ceab7..b3e7f950b7b 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -276,6 +276,12 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { reqBody: compressSnappy(t, testBody), respCode: http.StatusOK, }, + { + name: "ValidSnappyFramedAsSnappy", + encoding: "snappy", + reqBody: compressSnappyFramed(t, testBody), + respCode: http.StatusOK, + }, { name: "ValidLz4", encoding: "lz4", From 43dc2475c3e2603f9c45c141e1351d6634ea9005 Mon Sep 17 00:00:00 2001 From: Michael Graff Date: Fri, 11 Apr 2025 04:54:54 -0500 Subject: [PATCH 04/14] address lint issue --- config/confighttp/compression.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index 0551e2fb816..62f7ad32eef 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -55,17 +55,16 @@ func snappyHandler(body io.ReadCloser) (io.ReadCloser, error) { Reader: snappy.NewReader(br), orig: body, }, nil - } else { - compressed, err := io.ReadAll(br) - if err != nil { - return nil, err - } - decoded, err := snappy.Decode(nil, compressed) - if err != nil { - return nil, err - } - return io.NopCloser(bytes.NewReader(decoded)), nil } + compressed, err := io.ReadAll(br) + if err != nil { + return nil, err + } + decoded, err := snappy.Decode(nil, compressed) + if err != nil { + return nil, err + } + return io.NopCloser(bytes.NewReader(decoded)), nil } var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){ From 667379581874467165ae22bfe50719a0095ed6a2 Mon Sep 17 00:00:00 2001 From: Michael Graff Date: Fri, 11 Apr 2025 05:13:06 -0500 Subject: [PATCH 05/14] add changelog --- .chloggen/snappy-done-right.yaml | 37 ++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 .chloggen/snappy-done-right.yaml diff --git a/.chloggen/snappy-done-right.yaml b/.chloggen/snappy-done-right.yaml new file mode 100644 index 00000000000..08598f40703 --- /dev/null +++ b/.chloggen/snappy-done-right.yaml @@ -0,0 +1,37 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: confighttp + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Fix handling of `snappy` content-encoding in a backwards-compatible way" + +# One or more tracking issues or pull requests related to the change +issues: [10584, 12825] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The collector used the Snappy compression type of "framed" to handle the HTTP + content-encoding "snappy". However, this encoding is typically used to indicate + the "block" compression variant of "snappy". This change allows the collector to: + - The server endpoints will now accept "x-snappy-framed" as a valid + content-encoding. + - Client compression type "snappy" will now compress to the "block" variant of snappy + instead of "framed". If you want the old behavior, you can set the + compression type to "x-snappy-framed". + - When a server endpoint receives a content-encoding of "snappy", it will + look at the first bytes of the payload to determine if it is "framed" or "block" snappy, + and will decompress accordingly. This is a backwards-compatible change. + - In a future release, this checking behavior may be removed. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] From f852a02b85abe26adacb66feb09fec3646f2c341 Mon Sep 17 00:00:00 2001 From: Michael Graff Date: Fri, 11 Apr 2025 05:16:59 -0500 Subject: [PATCH 06/14] lint-again --- config/confighttp/compression.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index 62f7ad32eef..e6de8e1b0c9 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -10,6 +10,7 @@ import ( "bytes" "compress/gzip" "compress/zlib" + "errors" "fmt" "io" "net/http" @@ -44,7 +45,7 @@ func snappyHandler(body io.ReadCloser) (io.ReadCloser, error) { br := bufio.NewReader(body) peekBytes, err := br.Peek(len(snappyFramingHeader)) - if err != nil && err != io.EOF { + if err != nil && !errors.Is(err, io.EOF) { return nil, err } From 69ded1213682eeaff7deb0e9ec9b0fa08b606829 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Tue, 22 Apr 2025 17:58:57 -0300 Subject: [PATCH 07/14] Add feature-flag to enable/disable framed snappy in client side. Signed-off-by: Arthur Silva Sens --- .chloggen/snappy-done-right.yaml | 17 ++-- config/confighttp/compression.go | 114 +++++++++++++++----------- config/confighttp/compression_test.go | 93 ++++++++++++--------- config/confighttp/compressor.go | 12 +++ config/confighttp/confighttp.go | 9 +- config/confighttp/go.mod | 2 +- 6 files changed, 149 insertions(+), 98 deletions(-) diff --git a/.chloggen/snappy-done-right.yaml b/.chloggen/snappy-done-right.yaml index 08598f40703..0dbe63e9214 100644 --- a/.chloggen/snappy-done-right.yaml +++ b/.chloggen/snappy-done-right.yaml @@ -1,7 +1,7 @@ # Use this changelog template to create an entry for release notes. # One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: enhancement +change_type: bug_fix # The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) component: confighttp @@ -19,15 +19,14 @@ subtext: | The collector used the Snappy compression type of "framed" to handle the HTTP content-encoding "snappy". However, this encoding is typically used to indicate the "block" compression variant of "snappy". This change allows the collector to: - - The server endpoints will now accept "x-snappy-framed" as a valid - content-encoding. - - Client compression type "snappy" will now compress to the "block" variant of snappy - instead of "framed". If you want the old behavior, you can set the - compression type to "x-snappy-framed". - - When a server endpoint receives a content-encoding of "snappy", it will - look at the first bytes of the payload to determine if it is "framed" or "block" snappy, + - When receiving a request with encoding 'snappy', the server endpoints will peek + at the first bytes of the payload to determine if it is "framed" or "block" snappy, and will decompress accordingly. This is a backwards-compatible change. - - In a future release, this checking behavior may be removed. + + If the feature-gate "confighttp.framedSnappy" is enabled, you'll see new behavior for both client and server: + - Client compression type "snappy" will now compress to the "block" variant of snappy + instead of "framed". Client compression type "x-snappy-framed" will now compress to the "framed" variant of snappy. + - Servers will accept both "snappy" and "x-snappy-framed" as valid content-encodings. # Optional: The change log or logs in which this entry should be included. # e.g. '[user]' or '[user, api]' diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index e6de8e1b0c9..5906f668e38 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -20,6 +20,15 @@ import ( "github.com/pierrec/lz4/v4" "go.opentelemetry.io/collector/config/configcompression" + "go.opentelemetry.io/collector/featuregate" +) + +var enableFramedSnappy = featuregate.GlobalRegistry().MustRegister( + "confighttp.framedSnappy", + featuregate.StageAlpha, + featuregate.WithRegisterFromVersion("v0.125.0"), + featuregate.WithRegisterDescription("Content encoding 'snappy' will compress/decompress block snappy format while 'x-snappy-framed' will compress/decompress framed snappy format."), + featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector/issues/10584"), ) type compressRoundTripper struct { @@ -68,55 +77,63 @@ func snappyHandler(body io.ReadCloser) (io.ReadCloser, error) { return io.NopCloser(bytes.NewReader(decoded)), nil } -var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){ - "": func(io.ReadCloser) (io.ReadCloser, error) { - // Not a compressed payload. Nothing to do. - return nil, nil - }, - "gzip": func(body io.ReadCloser) (io.ReadCloser, error) { - gr, err := gzip.NewReader(body) - if err != nil { - return nil, err - } - return gr, nil - }, - "zstd": func(body io.ReadCloser) (io.ReadCloser, error) { - zr, err := zstd.NewReader( - body, - // Concurrency 1 disables async decoding. We don't need async decoding, it is pointless - // for our use-case (a server accepting decoding http requests). - // Disabling async improves performance (I benchmarked it previously when working - // on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257). - zstd.WithDecoderConcurrency(1), - ) - if err != nil { - return nil, err - } - return zr.IOReadCloser(), nil - }, - "zlib": func(body io.ReadCloser) (io.ReadCloser, error) { - zr, err := zlib.NewReader(body) - if err != nil { - return nil, err +func availableDecoders() map[string]func(body io.ReadCloser) (io.ReadCloser, error) { + var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){ + "": func(io.ReadCloser) (io.ReadCloser, error) { + // Not a compressed payload. Nothing to do. + return nil, nil + }, + "gzip": func(body io.ReadCloser) (io.ReadCloser, error) { + gr, err := gzip.NewReader(body) + if err != nil { + return nil, err + } + return gr, nil + }, + "zstd": func(body io.ReadCloser) (io.ReadCloser, error) { + zr, err := zstd.NewReader( + body, + // Concurrency 1 disables async decoding. We don't need async decoding, it is pointless + // for our use-case (a server accepting decoding http requests). + // Disabling async improves performance (I benchmarked it previously when working + // on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257). + zstd.WithDecoderConcurrency(1), + ) + if err != nil { + return nil, err + } + return zr.IOReadCloser(), nil + }, + "zlib": func(body io.ReadCloser) (io.ReadCloser, error) { + zr, err := zlib.NewReader(body) + if err != nil { + return nil, err + } + return zr, nil + }, + "snappy": snappyHandler, + + //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature + "lz4": func(body io.ReadCloser) (io.ReadCloser, error) { + return &compressReadCloser{ + Reader: lz4.NewReader(body), + orig: body, + }, nil + }, + } + + if enableFramedSnappy.IsEnabled() { + //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature + availableDecoders["x-snappy-framed"] = func(body io.ReadCloser) (io.ReadCloser, error) { + // Lazy Reading content to improve memory efficiency + return &compressReadCloser{ + Reader: snappy.NewReader(body), + orig: body, + }, nil } - return zr, nil - }, - "snappy": snappyHandler, - //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature - "x-snappy-framed": func(body io.ReadCloser) (io.ReadCloser, error) { - // Lazy Reading content to improve memory efficiency - return &compressReadCloser{ - Reader: snappy.NewReader(body), - orig: body, - }, nil - }, - //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature - "lz4": func(body io.ReadCloser) (io.ReadCloser, error) { - return &compressReadCloser{ - Reader: lz4.NewReader(body), - orig: body, - }, nil - }, + } + + return availableDecoders } func newCompressionParams(level configcompression.Level) configcompression.CompressionParams { @@ -183,6 +200,7 @@ func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w errHandler = eh } + availableDecoders := availableDecoders() enabled := map[string]func(body io.ReadCloser) (io.ReadCloser, error){} for _, dec := range enableDecoders { enabled[dec] = availableDecoders[dec] diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index b3e7f950b7b..3b4724acd25 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configcompression" + "go.opentelemetry.io/collector/featuregate" ) func TestHTTPClientCompression(t *testing.T) { @@ -40,11 +41,12 @@ func TestHTTPClientCompression(t *testing.T) { const invalidGzipLevel configcompression.Level = 100 tests := []struct { - name string - encoding configcompression.Type - level configcompression.Level - reqBody []byte - shouldError bool + name string + encoding configcompression.Type + level configcompression.Level + framedSnappyEnabled bool + reqBody []byte + shouldError bool }{ { name: "ValidEmpty", @@ -100,10 +102,11 @@ func TestHTTPClientCompression(t *testing.T) { shouldError: false, }, { - name: "ValidSnappy", - encoding: configcompression.TypeSnappy, - reqBody: compressedSnappyBody.Bytes(), - shouldError: false, + name: "ValidSnappy", + encoding: configcompression.TypeSnappy, + framedSnappyEnabled: true, + reqBody: compressedSnappyBody.Bytes(), + shouldError: false, }, { name: "InvalidSnappy", @@ -113,10 +116,11 @@ func TestHTTPClientCompression(t *testing.T) { shouldError: true, }, { - name: "ValidSnappyFramed", - encoding: configcompression.TypeSnappyFramed, - reqBody: compressedSnappyFramedBody.Bytes(), - shouldError: false, + name: "ValidSnappyFramed", + encoding: configcompression.TypeSnappyFramed, + framedSnappyEnabled: true, + reqBody: compressedSnappyFramedBody.Bytes(), + shouldError: false, }, { name: "InvalidSnappyFramed", @@ -148,6 +152,8 @@ func TestHTTPClientCompression(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + featuregate.GlobalRegistry().Set(enableFramedSnappy.ID(), tt.framedSnappyEnabled) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) assert.NoError(t, err, "failed to read request body: %v", err) @@ -207,7 +213,7 @@ func TestHTTPCustomDecompression(t *testing.T) { return io.NopCloser(strings.NewReader("decompressed body")), nil }, } - srv := httptest.NewServer(httpContentDecompressor(handler, defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms, decoders)) + srv := httptest.NewServer(httpContentDecompressor(handler, defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms(), decoders)) t.Cleanup(srv.Close) @@ -228,11 +234,12 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { testBody := []byte("uncompressed_text") noDecoders := map[string]func(io.ReadCloser) (io.ReadCloser, error){} tests := []struct { - name string - encoding string - reqBody *bytes.Buffer - respCode int - respBody string + name string + encoding string + reqBody *bytes.Buffer + respCode int + respBody string + framedSnappyEnabled bool }{ { name: "NoCompression", @@ -265,10 +272,11 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { respCode: http.StatusOK, }, { - name: "ValidSnappyFramed", - encoding: "x-snappy-framed", - reqBody: compressSnappyFramed(t, testBody), - respCode: http.StatusOK, + name: "ValidSnappyFramed", + encoding: "x-snappy-framed", + framedSnappyEnabled: true, + reqBody: compressSnappyFramed(t, testBody), + respCode: http.StatusOK, }, { name: "ValidSnappy", @@ -277,6 +285,9 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { respCode: http.StatusOK, }, { + // Should work even without the framed snappy feature gate enabled, + // since during decompression we're peeking the compression header + // and identifying which snappy encoding was used. name: "ValidSnappyFramedAsSnappy", encoding: "snappy", reqBody: compressSnappyFramed(t, testBody), @@ -317,11 +328,12 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { respBody: "invalid input: magic number mismatch", }, { - name: "InvalidSnappyFramed", - encoding: "x-snappy-framed", - reqBody: bytes.NewBuffer(testBody), - respCode: http.StatusBadRequest, - respBody: "snappy: corrupt input", + name: "InvalidSnappyFramed", + encoding: "x-snappy-framed", + framedSnappyEnabled: true, + reqBody: bytes.NewBuffer(testBody), + respCode: http.StatusBadRequest, + respBody: "snappy: corrupt input", }, { name: "InvalidSnappy", @@ -340,6 +352,8 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + featuregate.GlobalRegistry().Set(enableFramedSnappy.ID(), tt.framedSnappyEnabled) + srv := httptest.NewServer(httpContentDecompressor(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { @@ -351,7 +365,7 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { assert.NoError(t, err, "failed to read request body: %v", err) assert.EqualValues(t, testBody, string(body)) w.WriteHeader(http.StatusOK) - }), defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms, noDecoders)) + }), defaultMaxRequestBodySize, defaultErrorHandler, defaultCompressionAlgorithms(), noDecoders)) t.Cleanup(srv.Close) req, err := http.NewRequest(http.MethodGet, srv.URL, tt.reqBody) @@ -468,9 +482,10 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) { t.Parallel() for _, tc := range []struct { - name string - encoding string - compress func(tb testing.TB, payload []byte) *bytes.Buffer + name string + encoding string + compress func(tb testing.TB, payload []byte) *bytes.Buffer + framedSnappyEnabled bool }{ // None encoding is ignored since it does not // enforce the max body size if content encoding header is not set @@ -490,9 +505,10 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) { compress: compressZlib, }, { - name: "x-snappy-framed", - encoding: "x-snappy-framed", - compress: compressSnappyFramed, + name: "x-snappy-framed", + encoding: "x-snappy-framed", + compress: compressSnappyFramed, + framedSnappyEnabled: true, }, { name: "snappy", @@ -506,7 +522,8 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) { }, } { t.Run(tc.name, func(t *testing.T) { - t.Parallel() + // t.Parallel() // TODO: Re-enable parallel tests once feature gate is removed. We can't parallelize since registry is shared. + featuregate.GlobalRegistry().Set(enableFramedSnappy.ID(), tc.framedSnappyEnabled) h := httpContentDecompressor( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -517,8 +534,8 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) { }), 1024, defaultErrorHandler, - defaultCompressionAlgorithms, - availableDecoders, + defaultCompressionAlgorithms(), + availableDecoders(), ) payload := tc.compress(t, make([]byte, 2*1024)) // 2KB uncompressed payload diff --git a/config/confighttp/compressor.go b/config/confighttp/compressor.go index 5cc9e6f681f..70ad7227a1a 100644 --- a/config/confighttp/compressor.go +++ b/config/confighttp/compressor.go @@ -67,11 +67,23 @@ func newWriteCloserResetFunc(compressionType configcompression.Type, compression return w }, nil case configcompression.TypeSnappyFramed: + if !enableFramedSnappy.IsEnabled() { + return nil, errors.New("x-snappy-framed is not enabled") + } return func() writeCloserReset { return snappy.NewBufferedWriter(nil) }, nil case configcompression.TypeSnappy: + if !enableFramedSnappy.IsEnabled() { + // If framed snappy feature gate is not enabled, we keep the current behavior + // where the 'Content-Encoding: snappy' is compressed as the framed snappy format. + return func() writeCloserReset { + return snappy.NewBufferedWriter(nil) + }, nil + } return func() writeCloserReset { + // If framed snappy feature gate is enabled, we use the correct behavior + // where the 'Content-Encoding: snappy' is compressed as the block snappy format. return &rawSnappyWriter{} }, nil case configcompression.TypeZstd: diff --git a/config/confighttp/confighttp.go b/config/confighttp/confighttp.go index 51eec74fe2e..9b91eb68be4 100644 --- a/config/confighttp/confighttp.go +++ b/config/confighttp/confighttp.go @@ -38,7 +38,12 @@ const ( defaultMaxRequestBodySize = 20 * 1024 * 1024 // 20MiB ) -var defaultCompressionAlgorithms = []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4", "x-snappy-framed"} +func defaultCompressionAlgorithms() []string { + if enableFramedSnappy.IsEnabled() { + return []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4", "x-snappy-framed"} + } + return []string{"", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4"} +} // ClientConfig defines settings for creating an HTTP client. type ClientConfig struct { @@ -450,7 +455,7 @@ func (hss *ServerConfig) ToServer(ctx context.Context, host component.Host, sett } if hss.CompressionAlgorithms == nil { - hss.CompressionAlgorithms = defaultCompressionAlgorithms + hss.CompressionAlgorithms = defaultCompressionAlgorithms() } // Apply middlewares in reverse order so they execute in diff --git a/config/confighttp/go.mod b/config/confighttp/go.mod index ef034e88402..c8c2c415eae 100644 --- a/config/confighttp/go.mod +++ b/config/confighttp/go.mod @@ -21,6 +21,7 @@ require ( go.opentelemetry.io/collector/extension/extensionauth/extensionauthtest v0.125.0 go.opentelemetry.io/collector/extension/extensionmiddleware v0.125.0 go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest v0.125.0 + go.opentelemetry.io/collector/featuregate v1.31.0 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 go.opentelemetry.io/otel v1.35.0 go.uber.org/goleak v1.3.0 @@ -41,7 +42,6 @@ require ( github.com/hashicorp/go-version v1.7.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/collector/featuregate v1.31.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.125.0 // indirect go.opentelemetry.io/collector/pdata v1.31.0 // indirect go.opentelemetry.io/contrib/bridges/otelzap v0.10.0 // indirect From fddb53c68816f011b50b87ba208c7888de5dda8f Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Tue, 22 Apr 2025 19:19:46 -0300 Subject: [PATCH 08/14] lint Signed-off-by: Arthur Silva Sens --- config/confighttp/compression.go | 2 +- config/confighttp/compression_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index 5906f668e38..f9237b59230 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -78,7 +78,7 @@ func snappyHandler(body io.ReadCloser) (io.ReadCloser, error) { } func availableDecoders() map[string]func(body io.ReadCloser) (io.ReadCloser, error) { - var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){ + availableDecoders := map[string]func(body io.ReadCloser) (io.ReadCloser, error){ "": func(io.ReadCloser) (io.ReadCloser, error) { // Not a compressed payload. Nothing to do. return nil, nil diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index 3b4724acd25..e59eb0efe5e 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -152,7 +152,7 @@ func TestHTTPClientCompression(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - featuregate.GlobalRegistry().Set(enableFramedSnappy.ID(), tt.framedSnappyEnabled) + _ = featuregate.GlobalRegistry().Set(enableFramedSnappy.ID(), tt.framedSnappyEnabled) srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) @@ -352,7 +352,7 @@ func TestHTTPContentDecompressionHandler(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - featuregate.GlobalRegistry().Set(enableFramedSnappy.ID(), tt.framedSnappyEnabled) + _ = featuregate.GlobalRegistry().Set(enableFramedSnappy.ID(), tt.framedSnappyEnabled) srv := httptest.NewServer(httpContentDecompressor(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) @@ -523,7 +523,7 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { // t.Parallel() // TODO: Re-enable parallel tests once feature gate is removed. We can't parallelize since registry is shared. - featuregate.GlobalRegistry().Set(enableFramedSnappy.ID(), tc.framedSnappyEnabled) + _ = featuregate.GlobalRegistry().Set(enableFramedSnappy.ID(), tc.framedSnappyEnabled) h := httpContentDecompressor( http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { From ee25bda1259b16f8f996ec1dbd5924aea526e2f7 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Sat, 26 Apr 2025 15:59:37 -0300 Subject: [PATCH 09/14] Update confighttp README Signed-off-by: Arthur Silva Sens --- config/confighttp/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/config/confighttp/README.md b/config/confighttp/README.md index 1ee56939814..20abc5dbaf0 100644 --- a/config/confighttp/README.md +++ b/config/confighttp/README.md @@ -48,6 +48,8 @@ README](../configtls/README.md). - SpeedBestCompression: `11` - `snappy` No compression levels supported yet + - `x-snappy-framed` (When feature gate `confighttp.framedSnappy` is enabled) + No compression levels supported yet - [`max_idle_conns`](https://golang.org/pkg/net/http/#Transport) - [`max_idle_conns_per_host`](https://golang.org/pkg/net/http/#Transport) - [`max_conns_per_host`](https://golang.org/pkg/net/http/#Transport) @@ -105,6 +107,7 @@ will not be enabled. - `endpoint`: Valid value syntax available [here](https://github.com/grpc/grpc/blob/master/doc/naming.md) - `max_request_body_size`: configures the maximum allowed body size in bytes for a single request. Default: `20971520` (20MiB) - `compression_algorithms`: configures the list of compression algorithms the server can accept. Default: ["", "gzip", "zstd", "zlib", "snappy", "deflate", "lz4"] + - `x-snappy-framed` can be used if feature gate `confighttp.snappyFramed` is enabled. - [`tls`](../configtls/README.md) - [`auth`](../configauth/README.md) - `request_params`: a list of query parameter names to add to the auth context, along with the HTTP headers From b520d5888401499e2c0ce29eae0e4a6d32dc94fe Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Mon, 28 Apr 2025 14:38:58 -0300 Subject: [PATCH 10/14] Add comments referencing Snappy Framed docs Signed-off-by: Arthur Silva Sens --- config/confighttp/compression.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index f9237b59230..c8cb09d24c0 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -50,6 +50,9 @@ var snappyFramingHeader = []byte{ // However, "content-encoding: snappy" is uses the block format, and "x-snappy-framed" // is the framing format. This handler is a (hopefully temporary) hack to // make this work in a backwards-compatible way. +// +// See https://github.com/google/snappy/blob/6af9287fbdb913f0794d0148c6aa43b58e63c8e3/framing_format.txt#L27-L36 +// for more details on the framing format. func snappyHandler(body io.ReadCloser) (io.ReadCloser, error) { br := bufio.NewReader(body) From f6161575797be863661a51450ca5b84b57d9abfb Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Mon, 28 Apr 2025 14:42:58 -0300 Subject: [PATCH 11/14] Only allocate availableDecoders once Signed-off-by: Arthur Silva Sens --- config/confighttp/compression.go | 15 ++++++++++----- config/confighttp/compression_test.go | 2 +- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index c8cb09d24c0..35e5ce2f2ea 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -80,8 +80,10 @@ func snappyHandler(body io.ReadCloser) (io.ReadCloser, error) { return io.NopCloser(bytes.NewReader(decoded)), nil } -func availableDecoders() map[string]func(body io.ReadCloser) (io.ReadCloser, error) { - availableDecoders := map[string]func(body io.ReadCloser) (io.ReadCloser, error){ +var availableDecoders map[string]func(body io.ReadCloser) (io.ReadCloser, error) + +func getAvailableDecoders() map[string]func(body io.ReadCloser) (io.ReadCloser, error) { + decoders := map[string]func(body io.ReadCloser) (io.ReadCloser, error){ "": func(io.ReadCloser) (io.ReadCloser, error) { // Not a compressed payload. Nothing to do. return nil, nil @@ -127,7 +129,7 @@ func availableDecoders() map[string]func(body io.ReadCloser) (io.ReadCloser, err if enableFramedSnappy.IsEnabled() { //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature - availableDecoders["x-snappy-framed"] = func(body io.ReadCloser) (io.ReadCloser, error) { + decoders["x-snappy-framed"] = func(body io.ReadCloser) (io.ReadCloser, error) { // Lazy Reading content to improve memory efficiency return &compressReadCloser{ Reader: snappy.NewReader(body), @@ -136,7 +138,7 @@ func availableDecoders() map[string]func(body io.ReadCloser) (io.ReadCloser, err } } - return availableDecoders + return decoders } func newCompressionParams(level configcompression.Level) configcompression.CompressionParams { @@ -203,7 +205,10 @@ func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w errHandler = eh } - availableDecoders := availableDecoders() + if availableDecoders == nil { + availableDecoders = getAvailableDecoders() + } + enabled := map[string]func(body io.ReadCloser) (io.ReadCloser, error){} for _, dec := range enableDecoders { enabled[dec] = availableDecoders[dec] diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index e59eb0efe5e..a1c5c72828b 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -535,7 +535,7 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) { 1024, defaultErrorHandler, defaultCompressionAlgorithms(), - availableDecoders(), + getAvailableDecoders(), ) payload := tc.compress(t, make([]byte, 2*1024)) // 2KB uncompressed payload From 34ba732ff0464f692503c0aa414dfbadbe82a679 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Wed, 30 Apr 2025 14:23:01 -0300 Subject: [PATCH 12/14] Make 'availableDecoders' a const map again Signed-off-by: Arthur Silva Sens --- config/confighttp/compression.go | 119 ++++++++++++-------------- config/confighttp/compression_test.go | 2 +- 2 files changed, 55 insertions(+), 66 deletions(-) diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index 35e5ce2f2ea..49cf670522b 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -38,6 +38,57 @@ type compressRoundTripper struct { compressor *compressor } +var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, error){ + "": func(io.ReadCloser) (io.ReadCloser, error) { + // Not a compressed payload. Nothing to do. + return nil, nil + }, + "gzip": func(body io.ReadCloser) (io.ReadCloser, error) { + gr, err := gzip.NewReader(body) + if err != nil { + return nil, err + } + return gr, nil + }, + "zstd": func(body io.ReadCloser) (io.ReadCloser, error) { + zr, err := zstd.NewReader( + body, + // Concurrency 1 disables async decoding. We don't need async decoding, it is pointless + // for our use-case (a server accepting decoding http requests). + // Disabling async improves performance (I benchmarked it previously when working + // on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257). + zstd.WithDecoderConcurrency(1), + ) + if err != nil { + return nil, err + } + return zr.IOReadCloser(), nil + }, + "zlib": func(body io.ReadCloser) (io.ReadCloser, error) { + zr, err := zlib.NewReader(body) + if err != nil { + return nil, err + } + return zr, nil + }, + //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature + "snappy": snappyHandler, + //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature + "lz4": func(body io.ReadCloser) (io.ReadCloser, error) { + return &compressReadCloser{ + Reader: lz4.NewReader(body), + orig: body, + }, nil + }, + //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature + "x-snappy-framed": func(body io.ReadCloser) (io.ReadCloser, error) { + return &compressReadCloser{ + Reader: snappy.NewReader(body), + orig: body, + }, nil + }, +} + // snappyFramingHeader is always the first 10 bytes of a snappy framed stream. var snappyFramingHeader = []byte{ 0xff, 0x06, 0x00, 0x00, @@ -80,67 +131,6 @@ func snappyHandler(body io.ReadCloser) (io.ReadCloser, error) { return io.NopCloser(bytes.NewReader(decoded)), nil } -var availableDecoders map[string]func(body io.ReadCloser) (io.ReadCloser, error) - -func getAvailableDecoders() map[string]func(body io.ReadCloser) (io.ReadCloser, error) { - decoders := map[string]func(body io.ReadCloser) (io.ReadCloser, error){ - "": func(io.ReadCloser) (io.ReadCloser, error) { - // Not a compressed payload. Nothing to do. - return nil, nil - }, - "gzip": func(body io.ReadCloser) (io.ReadCloser, error) { - gr, err := gzip.NewReader(body) - if err != nil { - return nil, err - } - return gr, nil - }, - "zstd": func(body io.ReadCloser) (io.ReadCloser, error) { - zr, err := zstd.NewReader( - body, - // Concurrency 1 disables async decoding. We don't need async decoding, it is pointless - // for our use-case (a server accepting decoding http requests). - // Disabling async improves performance (I benchmarked it previously when working - // on https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/23257). - zstd.WithDecoderConcurrency(1), - ) - if err != nil { - return nil, err - } - return zr.IOReadCloser(), nil - }, - "zlib": func(body io.ReadCloser) (io.ReadCloser, error) { - zr, err := zlib.NewReader(body) - if err != nil { - return nil, err - } - return zr, nil - }, - "snappy": snappyHandler, - - //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature - "lz4": func(body io.ReadCloser) (io.ReadCloser, error) { - return &compressReadCloser{ - Reader: lz4.NewReader(body), - orig: body, - }, nil - }, - } - - if enableFramedSnappy.IsEnabled() { - //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature - decoders["x-snappy-framed"] = func(body io.ReadCloser) (io.ReadCloser, error) { - // Lazy Reading content to improve memory efficiency - return &compressReadCloser{ - Reader: snappy.NewReader(body), - orig: body, - }, nil - } - } - - return decoders -} - func newCompressionParams(level configcompression.Level) configcompression.CompressionParams { return configcompression.CompressionParams{ Level: level, @@ -205,12 +195,11 @@ func httpContentDecompressor(h http.Handler, maxRequestBodySize int64, eh func(w errHandler = eh } - if availableDecoders == nil { - availableDecoders = getAvailableDecoders() - } - enabled := map[string]func(body io.ReadCloser) (io.ReadCloser, error){} for _, dec := range enableDecoders { + if dec == "x-frame-snappy" && !enableFramedSnappy.IsEnabled() { + continue + } enabled[dec] = availableDecoders[dec] if dec == "deflate" { diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index a1c5c72828b..32db090f8b5 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -535,7 +535,7 @@ func TestDecompressorAvoidDecompressionBomb(t *testing.T) { 1024, defaultErrorHandler, defaultCompressionAlgorithms(), - getAvailableDecoders(), + availableDecoders, ) payload := tc.compress(t, make([]byte, 2*1024)) // 2KB uncompressed payload From 5aae0a48a2749fce4fd38009663762d3dd0f9548 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Fri, 2 May 2025 11:35:39 -0300 Subject: [PATCH 13/14] lint Signed-off-by: Arthur Silva Sens --- config/confighttp/compression.go | 1 - 1 file changed, 1 deletion(-) diff --git a/config/confighttp/compression.go b/config/confighttp/compression.go index 49cf670522b..afded024b2b 100644 --- a/config/confighttp/compression.go +++ b/config/confighttp/compression.go @@ -71,7 +71,6 @@ var availableDecoders = map[string]func(body io.ReadCloser) (io.ReadCloser, erro } return zr, nil }, - //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature "snappy": snappyHandler, //nolint:unparam // Ignoring the linter request to remove error return since it needs to match the method signature "lz4": func(body io.ReadCloser) (io.ReadCloser, error) { From 590d0189329d534b5912c20a0fb8b259e5366cc9 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Mon, 5 May 2025 17:53:18 +0200 Subject: [PATCH 14/14] Update .chloggen/snappy-done-right.yaml Co-authored-by: Jonathan --- .chloggen/snappy-done-right.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/snappy-done-right.yaml b/.chloggen/snappy-done-right.yaml index 0dbe63e9214..386b94d40bb 100644 --- a/.chloggen/snappy-done-right.yaml +++ b/.chloggen/snappy-done-right.yaml @@ -4,7 +4,7 @@ change_type: bug_fix # The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) -component: confighttp +component: confighttp and configcompression # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). note: "Fix handling of `snappy` content-encoding in a backwards-compatible way"