From 59194b83b4b499a0729eee7185243b0b455bffc5 Mon Sep 17 00:00:00 2001 From: Robert Pajak Date: Tue, 22 Apr 2025 10:24:28 +0200 Subject: [PATCH 1/4] otlploghttp: Add WithTransport option --- exporters/otlp/otlplog/otlploghttp/client.go | 4 +- .../otlp/otlplog/otlploghttp/client_test.go | 21 ++++ exporters/otlp/otlplog/otlploghttp/config.go | 20 ++++ .../otlp/otlplog/otlploghttp/config_test.go | 103 ++++++++++-------- 4 files changed, 103 insertions(+), 45 deletions(-) diff --git a/exporters/otlp/otlplog/otlploghttp/client.go b/exporters/otlp/otlplog/otlploghttp/client.go index 279b4be4f60..8c7038d85fc 100644 --- a/exporters/otlp/otlplog/otlploghttp/client.go +++ b/exporters/otlp/otlplog/otlploghttp/client.go @@ -45,11 +45,11 @@ func newNoopClient() *client { // newHTTPClient creates a new HTTP log client. func newHTTPClient(cfg config) (*client, error) { hc := &http.Client{ - Transport: ourTransport, + Transport: cfg.transport.Value, Timeout: cfg.timeout.Value, } - if cfg.tlsCfg.Value != nil || cfg.proxy.Value != nil { + if cfg.transport.Value == ourTransport && (cfg.tlsCfg.Value != nil || cfg.proxy.Value != nil) { clonedTransport := ourTransport.Clone() hc.Transport = clonedTransport diff --git a/exporters/otlp/otlplog/otlploghttp/client_test.go b/exporters/otlp/otlplog/otlploghttp/client_test.go index 97ae311ca5e..cd52fa8ecdb 100644 --- a/exporters/otlp/otlplog/otlploghttp/client_test.go +++ b/exporters/otlp/otlplog/otlploghttp/client_test.go @@ -783,6 +783,27 @@ func TestConfig(t *testing.T) { assert.Equal(t, []string{headerValueSetInProxy}, got[headerKeySetInProxy]) }) + t.Run("WithTransport", func(t *testing.T) { + headerKeySetInProxy := http.CanonicalHeaderKey("X-Using-Proxy") + headerValueSetInProxy := "true" + exp, coll := factoryFunc("", nil, WithTransport(&http.Transport{ + + Proxy: func(r *http.Request) (*url.URL, error) { + r.Header.Set(headerKeySetInProxy, headerValueSetInProxy) + return r.URL, nil + }, + })) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + require.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + // Ensure everything is flushed. + require.NoError(t, exp.Shutdown(ctx)) + + got := coll.Headers() + require.Contains(t, got, headerKeySetInProxy) + assert.Equal(t, []string{headerValueSetInProxy}, got[headerKeySetInProxy]) + }) + t.Run("non-retryable errors are propagated", func(t *testing.T) { exporterErr := errors.New("missing required attribute aaaa") rCh := make(chan exportResult, 1) diff --git a/exporters/otlp/otlplog/otlploghttp/config.go b/exporters/otlp/otlplog/otlploghttp/config.go index b8952272c13..835b04582d6 100644 --- a/exporters/otlp/otlplog/otlploghttp/config.go +++ b/exporters/otlp/otlplog/otlploghttp/config.go @@ -95,6 +95,7 @@ type config struct { timeout setting[time.Duration] proxy setting[HTTPTransportProxyFunc] retryCfg setting[retry.Config] + transport setting[http.RoundTripper] } func newConfig(options []Option) config { @@ -134,6 +135,9 @@ func newConfig(options []Option) config { c.retryCfg = c.retryCfg.Resolve( fallback[retry.Config](defaultRetryCfg), ) + c.transport = c.transport.Resolve( + fallback[http.RoundTripper](ourTransport), + ) return c } @@ -344,6 +348,22 @@ func WithProxy(pf HTTPTransportProxyFunc) Option { }) } +// WithTransport sets the HTTP transport to use by the exporter's HTTP client. +// +// This option will take precedence over [WithProxy], [WithTLSClientConfig] options +// as well as OTEL_EXPORTER_OTLP_CERTIFICATE and OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE +// environment variables. +// +// Be aware that passing an transport like +// [go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp.NewTransport] can +// cause the client to be instrumented twice and cause infinite recursion. +func WithTransport(r http.RoundTripper) Option { + return fnOpt(func(c config) config { + c.transport = newSetting(r) + return c + }) +} + // setting is a configuration setting value. type setting[T any] struct { Value T diff --git a/exporters/otlp/otlplog/otlploghttp/config_test.go b/exporters/otlp/otlplog/otlploghttp/config_test.go index a6863e3805a..c6ce901c78f 100644 --- a/exporters/otlp/otlplog/otlploghttp/config_test.go +++ b/exporters/otlp/otlplog/otlploghttp/config_test.go @@ -92,10 +92,11 @@ func TestNewConfig(t *testing.T) { { name: "Defaults", want: config{ - endpoint: newSetting(defaultEndpoint), - path: newSetting(defaultPath), - timeout: newSetting(defaultTimeout), - retryCfg: newSetting(defaultRetryCfg), + endpoint: newSetting(defaultEndpoint), + path: newSetting(defaultPath), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -109,6 +110,7 @@ func TestNewConfig(t *testing.T) { WithHeaders(headers), WithTimeout(time.Second), WithRetry(RetryConfig(rc)), + WithTransport(http.DefaultTransport), // Do not test WithProxy. Requires func comparison. }, want: config{ @@ -120,6 +122,7 @@ func TestNewConfig(t *testing.T) { compression: newSetting(GzipCompression), timeout: newSetting(time.Second), retryCfg: newSetting(rc), + transport: newSetting(http.RoundTripper(http.DefaultTransport)), }, }, { @@ -128,11 +131,12 @@ func TestNewConfig(t *testing.T) { WithEndpointURL("http://test:8080/path"), }, want: config{ - endpoint: newSetting("test:8080"), - path: newSetting("/path"), - insecure: newSetting(true), - timeout: newSetting(defaultTimeout), - retryCfg: newSetting(defaultRetryCfg), + endpoint: newSetting("test:8080"), + path: newSetting("/path"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -144,11 +148,12 @@ func TestNewConfig(t *testing.T) { WithInsecure(), }, want: config{ - endpoint: newSetting("not-test:9090"), - path: newSetting("/alt"), - insecure: newSetting(true), - timeout: newSetting(defaultTimeout), - retryCfg: newSetting(defaultRetryCfg), + endpoint: newSetting("not-test:9090"), + path: newSetting("/alt"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -160,11 +165,12 @@ func TestNewConfig(t *testing.T) { WithEndpointURL("https://test:8080/path"), }, want: config{ - endpoint: newSetting("test:8080"), - path: newSetting("/path"), - insecure: newSetting(false), - timeout: newSetting(defaultTimeout), - retryCfg: newSetting(defaultRetryCfg), + endpoint: newSetting("test:8080"), + path: newSetting("/path"), + insecure: newSetting(false), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -176,11 +182,12 @@ func TestNewConfig(t *testing.T) { WithEndpointURL("https://test:8080/path"), }, want: config{ - endpoint: newSetting("test:8080"), - path: newSetting("/path"), - insecure: newSetting(false), - timeout: newSetting(defaultTimeout), - retryCfg: newSetting(defaultRetryCfg), + endpoint: newSetting("test:8080"), + path: newSetting("/path"), + insecure: newSetting(false), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -192,11 +199,12 @@ func TestNewConfig(t *testing.T) { WithEndpointURL("https://test:8080/path"), }, want: config{ - endpoint: newSetting("test:8080"), - path: newSetting("/path"), - insecure: newSetting(false), - timeout: newSetting(defaultTimeout), - retryCfg: newSetting(defaultRetryCfg), + endpoint: newSetting("test:8080"), + path: newSetting("/path"), + insecure: newSetting(false), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -219,6 +227,7 @@ func TestNewConfig(t *testing.T) { compression: newSetting(GzipCompression), timeout: newSetting(15 * time.Second), retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -227,11 +236,12 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "http://env.endpoint", }, want: config{ - endpoint: newSetting("env.endpoint"), - path: newSetting("/"), - insecure: newSetting(true), - timeout: newSetting(defaultTimeout), - retryCfg: newSetting(defaultRetryCfg), + endpoint: newSetting("env.endpoint"), + path: newSetting("/"), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -254,6 +264,7 @@ func TestNewConfig(t *testing.T) { compression: newSetting(NoCompression), timeout: newSetting(15 * time.Second), retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -262,11 +273,12 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint", }, want: config{ - endpoint: newSetting("env.endpoint"), - path: newSetting(defaultPath), - insecure: newSetting(true), - timeout: newSetting(defaultTimeout), - retryCfg: newSetting(defaultRetryCfg), + endpoint: newSetting("env.endpoint"), + path: newSetting(defaultPath), + insecure: newSetting(true), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -297,6 +309,7 @@ func TestNewConfig(t *testing.T) { compression: newSetting(GzipCompression), timeout: newSetting(15 * time.Second), retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -338,6 +351,7 @@ func TestNewConfig(t *testing.T) { compression: newSetting(GzipCompression), timeout: newSetting(time.Second), retryCfg: newSetting(rc), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -352,10 +366,11 @@ func TestNewConfig(t *testing.T) { "OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "invalid_key", }, want: config{ - endpoint: newSetting(defaultEndpoint), - path: newSetting(defaultPath), - timeout: newSetting(defaultTimeout), - retryCfg: newSetting(defaultRetryCfg), + endpoint: newSetting(defaultEndpoint), + path: newSetting(defaultPath), + timeout: newSetting(defaultTimeout), + retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, errs: []string{ `invalid OTEL_EXPORTER_OTLP_LOGS_ENDPOINT value %invalid: parse "%invalid": invalid URL escape "%in"`, @@ -387,6 +402,7 @@ func TestNewConfig(t *testing.T) { compression: newSetting(GzipCompression), timeout: newSetting(15 * time.Second), retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, { @@ -408,6 +424,7 @@ func TestNewConfig(t *testing.T) { compression: newSetting(GzipCompression), timeout: newSetting(15 * time.Second), retryCfg: newSetting(defaultRetryCfg), + transport: newSetting(http.RoundTripper(ourTransport)), }, }, } From ea252cdb17fe02b938fe84f93be5c0cbec63c9fa Mon Sep 17 00:00:00 2001 From: Robert Pajak Date: Tue, 22 Apr 2025 10:55:33 +0200 Subject: [PATCH 2/4] Add changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 687e493e21c..fd580b5510b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm The package contains semantic conventions from the `v1.31.0` version of the OpenTelemetry Semantic Conventions. See the [migration documentation](./semconv/v1.31.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.30.0`(#6479) - Add `Recording`, `Scope`, and `Record` types in `go.opentelemetry.io/otel/log/logtest`. (#6507) +- Add `WithTransport` option allowing to set `http.RoundTrip` of `http.Client` used by `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#6686) ### Removed From b6bc42c3c35ad15c4bb853d5ea920bb7faac9fea Mon Sep 17 00:00:00 2001 From: Robert Pajak Date: Tue, 22 Apr 2025 10:59:27 +0200 Subject: [PATCH 3/4] Fix lint --- exporters/otlp/otlplog/otlploghttp/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporters/otlp/otlplog/otlploghttp/config_test.go b/exporters/otlp/otlplog/otlploghttp/config_test.go index c6ce901c78f..848879d9879 100644 --- a/exporters/otlp/otlplog/otlploghttp/config_test.go +++ b/exporters/otlp/otlplog/otlploghttp/config_test.go @@ -122,7 +122,7 @@ func TestNewConfig(t *testing.T) { compression: newSetting(GzipCompression), timeout: newSetting(time.Second), retryCfg: newSetting(rc), - transport: newSetting(http.RoundTripper(http.DefaultTransport)), + transport: newSetting(http.DefaultTransport), }, }, { From c998f905c8926528d9365cf7be7b74b325952715 Mon Sep 17 00:00:00 2001 From: Robert Pajak Date: Tue, 22 Apr 2025 11:01:55 +0200 Subject: [PATCH 4/4] gofumpt --- exporters/otlp/otlplog/otlploghttp/client_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/exporters/otlp/otlplog/otlploghttp/client_test.go b/exporters/otlp/otlplog/otlploghttp/client_test.go index cd52fa8ecdb..f032bb0533d 100644 --- a/exporters/otlp/otlplog/otlploghttp/client_test.go +++ b/exporters/otlp/otlplog/otlploghttp/client_test.go @@ -787,7 +787,6 @@ func TestConfig(t *testing.T) { headerKeySetInProxy := http.CanonicalHeaderKey("X-Using-Proxy") headerValueSetInProxy := "true" exp, coll := factoryFunc("", nil, WithTransport(&http.Transport{ - Proxy: func(r *http.Request) (*url.URL, error) { r.Header.Set(headerKeySetInProxy, headerValueSetInProxy) return r.URL, nil