diff --git a/exporters/otlp/otlplog/otlploghttp/client.go b/exporters/otlp/otlplog/otlploghttp/client.go index 279b4be4f60..d54b8bbd3e6 100644 --- a/exporters/otlp/otlplog/otlploghttp/client.go +++ b/exporters/otlp/otlplog/otlploghttp/client.go @@ -86,20 +86,22 @@ func newHTTPClient(cfg config) (*client, error) { req.Header.Set("Content-Type", "application/x-protobuf") c := &httpClient{ - compression: cfg.compression.Value, - req: req, - requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate), - client: hc, + compression: cfg.compression.Value, + req: req, + requestFunc: cfg.retryCfg.Value.RequestFunc(evaluate), + client: hc, + headersProvider: cfg.headersProvider.Value, } return &client{uploadLogs: c.uploadLogs}, nil } type httpClient struct { // req is cloned for every upload the client makes. - req *http.Request - compression Compression - requestFunc retry.RequestFunc - client *http.Client + req *http.Request + compression Compression + requestFunc retry.RequestFunc + client *http.Client + headersProvider HeadersProviderFunc } // Keep it in sync with golang's DefaultTransport from net/http! We @@ -227,6 +229,14 @@ func (c *httpClient) newRequest(ctx context.Context, body []byte) (request, erro r := c.req.Clone(ctx) req := request{Request: r} + headers, err := c.headersProvider() + if err != nil { + return req, fmt.Errorf("failed to execute headers provider: %w", err) + } + for k, v := range headers { + r.Header.Set(k, v) + } + switch c.compression { case NoCompression: r.ContentLength = (int64)(len(body)) diff --git a/exporters/otlp/otlplog/otlploghttp/client_test.go b/exporters/otlp/otlplog/otlploghttp/client_test.go index 607dea9f06d..6277c97ae57 100644 --- a/exporters/otlp/otlplog/otlploghttp/client_test.go +++ b/exporters/otlp/otlplog/otlploghttp/client_test.go @@ -635,6 +635,28 @@ func TestConfig(t *testing.T) { assert.Equal(t, []string{headers[key]}, got[key]) }) + t.Run("WithHeadersProvider", func(t *testing.T) { + key := http.CanonicalHeaderKey("my-custom-header") + key2 := http.CanonicalHeaderKey("my-provided-custom-header") + headers := map[string]string{key: "custom-value"} + providedHeaders := map[string]string{key: "custom-value-override", key2: "provided-custom-value"} + headersProvider := func() (map[string]string, error) { return providedHeaders, nil } + exp, coll := factoryFunc("", nil, WithHeaders(headers), WithHeadersProvider(headersProvider)) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + require.NoError(t, exp.Export(ctx, make([]log.Record, 1))) + // Ensure everything is flushed. + require.NoError(t, exp.Shutdown(ctx)) + + got := coll.Headers() + require.Regexp(t, "OTel Go OTLP over HTTP/protobuf logs exporter/[01]\\..*", got) + require.Contains(t, got, key) + // HeadersProviderFunc overrides Headers + assert.Equal(t, []string{providedHeaders[key]}, got[key]) + // HeaderProviderFunc values merged with Headers + assert.Equal(t, []string{providedHeaders[key2]}, got[key2]) + }) + t.Run("WithTimeout", func(t *testing.T) { // Do not send on rCh so the Collector never responds to the client. rCh := make(chan exportResult) diff --git a/exporters/otlp/otlplog/otlploghttp/config.go b/exporters/otlp/otlplog/otlploghttp/config.go index bfe768091e3..57d4d63960a 100644 --- a/exporters/otlp/otlplog/otlploghttp/config.go +++ b/exporters/otlp/otlplog/otlploghttp/config.go @@ -22,11 +22,12 @@ import ( // Default values. var ( - defaultEndpoint = "localhost:4318" - defaultPath = "/v1/logs" - defaultTimeout = 10 * time.Second - defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment - defaultRetryCfg = retry.DefaultConfig + defaultEndpoint = "localhost:4318" + defaultPath = "/v1/logs" + defaultTimeout = 10 * time.Second + defaultProxy HTTPTransportProxyFunc = http.ProxyFromEnvironment + defaultRetryCfg = retry.DefaultConfig + defaultHeadersProvider HeadersProviderFunc = func() (map[string]string, error) { return map[string]string{}, nil } ) // Environment variable keys. @@ -85,15 +86,16 @@ type fnOpt func(config) config func (f fnOpt) applyHTTPOption(c config) config { return f(c) } type config struct { - endpoint setting[string] - path setting[string] - insecure setting[bool] - tlsCfg setting[*tls.Config] - headers setting[map[string]string] - compression setting[Compression] - timeout setting[time.Duration] - proxy setting[HTTPTransportProxyFunc] - retryCfg setting[retry.Config] + endpoint setting[string] + path setting[string] + insecure setting[bool] + tlsCfg setting[*tls.Config] + headers setting[map[string]string] + headersProvider setting[HeadersProviderFunc] + compression setting[Compression] + timeout setting[time.Duration] + proxy setting[HTTPTransportProxyFunc] + retryCfg setting[retry.Config] } func newConfig(options []Option) config { @@ -117,6 +119,9 @@ func newConfig(options []Option) config { c.tlsCfg = c.tlsCfg.Resolve( loadEnvTLS[*tls.Config](), ) + c.headersProvider = c.headersProvider.Resolve( + fallback[HeadersProviderFunc](defaultHeadersProvider), + ) c.headers = c.headers.Resolve( getenv[map[string]string](envHeaders, convHeaders), ) @@ -286,6 +291,16 @@ func WithHeaders(headers map[string]string) Option { }) } +type HeadersProviderFunc func() (map[string]string, error) + +// WithHeadersProvider will be called to set the provided headers with each HTTP requests. +func WithHeadersProvider(providerFunc HeadersProviderFunc) Option { + return fnOpt(func(c config) config { + c.headersProvider = newSetting(providerFunc) + return c + }) +} + // WithTimeout sets the max amount of time an Exporter will attempt an export. // // This takes precedence over any retry settings defined by WithRetry. Once diff --git a/exporters/otlp/otlplog/otlploghttp/config_test.go b/exporters/otlp/otlplog/otlploghttp/config_test.go index 1a7568921db..5eb549bf523 100644 --- a/exporters/otlp/otlplog/otlploghttp/config_test.go +++ b/exporters/otlp/otlplog/otlploghttp/config_test.go @@ -109,7 +109,7 @@ func TestNewConfig(t *testing.T) { WithHeaders(headers), WithTimeout(time.Second), WithRetry(RetryConfig(rc)), - // Do not test WithProxy. Requires func comparison. + // Do not test WithProxy or WithHeaderProvider. Requires func comparison. }, want: config{ endpoint: newSetting("test"), @@ -395,6 +395,9 @@ func TestNewConfig(t *testing.T) { // Cannot compare funcs, see TestWithProxy. c.proxy = setting[HTTPTransportProxyFunc]{} + // Cannot compare funcs, see TestWithHeaderProvider + c.headersProvider = setting[HeadersProviderFunc]{} + assert.Equal(t, tc.want, c) for _, errMsg := range tc.errs { @@ -436,3 +439,21 @@ func TestWithProxy(t *testing.T) { assert.True(t, c.proxy.Set) assert.NotNil(t, c.proxy.Value) } + +func TestWithHeadersProvider(t *testing.T) { + key := "a" + expectedHeaders := map[string]string{key: "A"} + provider := func() (map[string]string, error) { + return expectedHeaders, nil + } + opts := []Option{WithHeadersProvider(provider)} + c := newConfig(opts) + + assert.True(t, c.headersProvider.Set) + assert.NotNil(t, c.headersProvider.Value) + + h, err := c.headersProvider.Value() + + assert.Equal(t, h[key], expectedHeaders[key]) + assert.Nil(t, err) +} diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index 86da30e3754..dcbd036ad4c 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -30,10 +30,11 @@ import ( type client struct { // req is cloned for every upload the client makes. - req *http.Request - compression Compression - requestFunc retry.RequestFunc - httpClient *http.Client + req *http.Request + compression Compression + requestFunc retry.RequestFunc + httpClient *http.Client + headersProvider HeadersProviderFunc } // Keep it in sync with golang's DefaultTransport from net/http! We @@ -97,10 +98,11 @@ func newClient(cfg oconf.Config) (*client, error) { req.Header.Set("Content-Type", "application/x-protobuf") return &client{ - compression: Compression(cfg.Metrics.Compression), - req: req, - requestFunc: cfg.RetryConfig.RequestFunc(evaluate), - httpClient: httpClient, + compression: Compression(cfg.Metrics.Compression), + req: req, + requestFunc: cfg.RetryConfig.RequestFunc(evaluate), + httpClient: httpClient, + headersProvider: HeadersProviderFunc(cfg.Metrics.HeadersProvider), }, nil } @@ -230,6 +232,16 @@ func (c *client) newRequest(ctx context.Context, body []byte) (request, error) { r := c.req.Clone(ctx) req := request{Request: r} + if c.headersProvider != nil { + headers, err := c.headersProvider() + if err != nil { + return req, fmt.Errorf("failed to execute headers provider: %w", err) + } + for k, v := range headers { + r.Header.Set(k, v) + } + } + switch c.compression { case NoCompression: r.ContentLength = (int64)(len(body)) diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go index f600cc45840..2085fb07f27 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go @@ -127,6 +127,24 @@ func TestConfig(t *testing.T) { assert.Equal(t, []string{headers[key]}, got[key]) }) + t.Run("WithHeadersProvider", func(t *testing.T) { + key := http.CanonicalHeaderKey("my-custom-header") + headers := map[string]string{key: "custom-value"} + exp, coll := factoryFunc("", nil, WithHeadersProvider(func() (map[string]string, error) { + return headers, nil + })) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) }) + require.NoError(t, exp.Export(ctx, &metricdata.ResourceMetrics{})) + // Ensure everything is flushed. + require.NoError(t, exp.Shutdown(ctx)) + + got := coll.Headers() + require.Regexp(t, "OTel Go OTLP over HTTP/protobuf metrics exporter/[01]\\..*", got) + require.Contains(t, got, key) + assert.Equal(t, []string{headers[key]}, got[key]) + }) + t.Run("WithTimeout", func(t *testing.T) { // Do not send on rCh so the Collector never responds to the client. rCh := make(chan otest.ExportResult) diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/config.go b/exporters/otlp/otlpmetric/otlpmetrichttp/config.go index bf05adcf1b1..baff2089f67 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/config.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/config.go @@ -23,6 +23,9 @@ type Compression oconf.Compression // to the OTLP HTTP client. type HTTPTransportProxyFunc func(*http.Request) (*url.URL, error) +// HeadersProviderFunc is a function which resolves to the headers to use for a given request. +type HeadersProviderFunc func() (map[string]string, error) + const ( // NoCompression tells the driver to send payloads without // compression. @@ -167,6 +170,11 @@ func WithHeaders(headers map[string]string) Option { return wrappedOption{oconf.WithHeaders(headers)} } +// WithHeadersProvider will be called to set the provided headers with each HTTP requests. +func WithHeadersProvider(providerFunc HeadersProviderFunc) Option { + return wrappedOption{oconf.WithHeadersProvider(oconf.HeadersProviderFunc(providerFunc))} +} + // WithTimeout sets the max amount of time an Exporter will attempt an export. // // This takes precedence over any retry settings defined by WithRetry. Once diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options.go index db595e49ec2..c499a08c1ec 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options.go @@ -47,14 +47,18 @@ type ( // This type is compatible with `http.Transport.Proxy` and can be used to set a custom proxy function to the OTLP HTTP client. HTTPTransportProxyFunc func(*http.Request) (*url.URL, error) + // HeadersProviderFunc is a function which resolves to the headers to use for a given request. + HeadersProviderFunc func() (map[string]string, error) + SignalConfig struct { - Endpoint string - Insecure bool - TLSCfg *tls.Config - Headers map[string]string - Compression Compression - Timeout time.Duration - URLPath string + Endpoint string + Insecure bool + TLSCfg *tls.Config + Headers map[string]string + HeadersProvider HeadersProviderFunc + Compression Compression + Timeout time.Duration + URLPath string // gRPC configurations GRPCCredentials credentials.TransportCredentials @@ -345,6 +349,13 @@ func WithHeaders(headers map[string]string) GenericOption { }) } +func WithHeadersProvider(hpf HeadersProviderFunc) GenericOption { + return newGenericOption(func(cfg Config) Config { + cfg.Metrics.HeadersProvider = hpf + return cfg + }) +} + func WithTimeout(duration time.Duration) GenericOption { return newGenericOption(func(cfg Config) Config { cfg.Metrics.Timeout = duration diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options_test.go index 36858bc3077..b7581b5b5fe 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf/options_test.go @@ -395,6 +395,44 @@ func TestConfigs(t *testing.T) { }, }, + // Headers Provider tests + { + name: " Test with Headers Provider", + opts: []GenericOption{ + WithHeadersProvider(func() (map[string]string, error) { + return map[string]string{"key": "value"}, nil + }), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.NotNil(t, c.Metrics.HeadersProvider) + headers, err := c.Metrics.HeadersProvider() + assert.NoError(t, err) + assert.Equal(t, map[string]string{"key": "value"}, headers) + }, + }, + { + name: " Test with Headers and Headers Provider", + opts: []GenericOption{ + WithHeaders(map[string]string{"key": "value"}), + WithHeadersProvider(func() (map[string]string, error) { + return map[string]string{"key": "value-override"}, nil + }), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.NotNil(t, c.Metrics.HeadersProvider) + headers, err := c.Metrics.HeadersProvider() + assert.NoError(t, err) + assert.Equal(t, map[string]string{"key": "value-override"}, headers) + }, + }, + { + name: " Test without Headers Provider", + opts: []GenericOption{}, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Nil(t, c.Metrics.HeadersProvider) + }, + }, + // Compression Tests { name: "Test With Compression", diff --git a/exporters/otlp/otlptrace/otlptracehttp/client.go b/exporters/otlp/otlptrace/otlptracehttp/client.go index 16c006b2cfd..00aee3a5f83 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client.go @@ -237,6 +237,16 @@ func (d *client) newRequest(body []byte) (request, error) { for k, v := range d.cfg.Headers { r.Header.Set(k, v) } + + if d.cfg.HeadersProvider != nil { + headers, err := d.cfg.HeadersProvider() + if err != nil { + return request{Request: r}, fmt.Errorf("failed to execute headers provider: %w", err) + } + for k, v := range headers { + r.Header.Set(k, v) + } + } r.Header.Set("Content-Type", contentTypeProto) req := request{Request: r} diff --git a/exporters/otlp/otlptrace/otlptracehttp/client_test.go b/exporters/otlp/otlptrace/otlptracehttp/client_test.go index 84e9ab7e655..fe91150f20e 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/client_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/client_test.go @@ -169,6 +169,17 @@ func TestEndToEnd(t *testing.T) { ExpectedHeaders: customProxyHeader, }, }, + { + name: "with headers proivder", + opts: []otlptracehttp.Option{ + otlptracehttp.WithHeadersProvider(func() (map[string]string, error) { + return testHeaders, nil + }), + }, + mcCfg: mockCollectorConfig{ + ExpectedHeaders: testHeaders, + }, + }, } for _, tc := range tests { diff --git a/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options.go b/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options.go index 6a9c4d3a652..2a112f37af6 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options.go +++ b/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options.go @@ -40,14 +40,18 @@ type ( // This type is compatible with `http.Transport.Proxy` and can be used to set a custom proxy function to the OTLP HTTP client. HTTPTransportProxyFunc func(*http.Request) (*url.URL, error) + // HeadersProviderFunc is a function which resolves to the headers to use for a given request. + HeadersProviderFunc func() (map[string]string, error) + SignalConfig struct { - Endpoint string - Insecure bool - TLSCfg *tls.Config - Headers map[string]string - Compression Compression - Timeout time.Duration - URLPath string + Endpoint string + Insecure bool + TLSCfg *tls.Config + Headers map[string]string + HeadersProvider HeadersProviderFunc + Compression Compression + Timeout time.Duration + URLPath string // gRPC configurations GRPCCredentials credentials.TransportCredentials @@ -336,6 +340,13 @@ func WithHeaders(headers map[string]string) GenericOption { }) } +func WithHeaderProvider(hpf HeadersProviderFunc) GenericOption { + return newGenericOption(func(cfg Config) Config { + cfg.Traces.HeadersProvider = hpf + return cfg + }) +} + func WithTimeout(duration time.Duration) GenericOption { return newGenericOption(func(cfg Config) Config { cfg.Traces.Timeout = duration diff --git a/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options_test.go b/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options_test.go index 55a3ad96986..ccd0fd915db 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options_test.go +++ b/exporters/otlp/otlptrace/otlptracehttp/internal/otlpconfig/options_test.go @@ -483,6 +483,43 @@ func TestConfigs(t *testing.T) { assert.Nil(t, c.Traces.Proxy) }, }, + // Headers Provider Tests + { + name: "Test With HeadersProvider", + opts: []GenericOption{ + WithHeaderProvider(func() (map[string]string, error) { + return map[string]string{"key": "value"}, nil + }), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.NotNil(t, c.Traces.HeadersProvider) + headers, err := c.Traces.HeadersProvider() + assert.NoError(t, err) + assert.Equal(t, headers["key"], "value") + }, + }, + { + name: "Test With Headers and HeadersProvider", + opts: []GenericOption{ + WithHeaders(map[string]string{"key": "value"}), + WithHeaderProvider(func() (map[string]string, error) { + return map[string]string{"key": "value-override"}, nil + }), + }, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.NotNil(t, c.Traces.HeadersProvider) + headers, err := c.Traces.HeadersProvider() + assert.NoError(t, err) + assert.Equal(t, headers["key"], "value-override") + }, + }, + { + name: "Test Without HeadersProvider", + opts: []GenericOption{}, + asserts: func(t *testing.T, c *Config, grpcOption bool) { + assert.Nil(t, c.Traces.HeadersProvider) + }, + }, } for _, tt := range tests { diff --git a/exporters/otlp/otlptrace/otlptracehttp/options.go b/exporters/otlp/otlptrace/otlptracehttp/options.go index 3559c5664f4..ff672c78fc4 100644 --- a/exporters/otlp/otlptrace/otlptracehttp/options.go +++ b/exporters/otlp/otlptrace/otlptracehttp/options.go @@ -22,6 +22,9 @@ type Compression otlpconfig.Compression // to the OTLP HTTP client. type HTTPTransportProxyFunc func(*http.Request) (*url.URL, error) +// HeadersProviderFunc is a function which resolves to the headers to use for a given request. +type HeadersProviderFunc func() (map[string]string, error) + const ( // NoCompression tells the driver to send payloads without // compression. @@ -132,6 +135,11 @@ func WithHeaders(headers map[string]string) Option { return wrappedOption{otlpconfig.WithHeaders(headers)} } +// WithHeadersProvider will be called to set the provided headers with each HTTP requests. +func WithHeadersProvider(headersProviderFunc HeadersProviderFunc) Option { + return wrappedOption{otlpconfig.WithHeaderProvider(otlpconfig.HeadersProviderFunc(headersProviderFunc))} +} + // WithTimeout tells the driver the max waiting time for the backend to process // each spans batch. If unset, the default will be 10 seconds. func WithTimeout(duration time.Duration) Option {