Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
The package contains semantic conventions from the `v1.31.0` version of the OpenTelemetry Semantic Conventions.
See the [migration documentation](./semconv/v1.31.0/MIGRATION.md) for information on how to upgrade from `go.opentelemetry.io/otel/semconv/v1.30.0`(#6479)
- Add `Recording`, `Scope`, and `Record` types in `go.opentelemetry.io/otel/log/logtest`. (#6507)
- Add `WithTransport` option allowing to set `http.RoundTrip` of `http.Client` used by `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#6686)

### Removed

Expand Down
4 changes: 2 additions & 2 deletions exporters/otlp/otlplog/otlploghttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,11 @@ func newNoopClient() *client {
// newHTTPClient creates a new HTTP log client.
func newHTTPClient(cfg config) (*client, error) {
hc := &http.Client{
Transport: ourTransport,
Transport: cfg.transport.Value,
Timeout: cfg.timeout.Value,
}

if cfg.tlsCfg.Value != nil || cfg.proxy.Value != nil {
if cfg.transport.Value == ourTransport && (cfg.tlsCfg.Value != nil || cfg.proxy.Value != nil) {
clonedTransport := ourTransport.Clone()
hc.Transport = clonedTransport

Expand Down
20 changes: 20 additions & 0 deletions exporters/otlp/otlplog/otlploghttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,6 +783,26 @@ func TestConfig(t *testing.T) {
assert.Equal(t, []string{headerValueSetInProxy}, got[headerKeySetInProxy])
})

t.Run("WithTransport", func(t *testing.T) {
headerKeySetInProxy := http.CanonicalHeaderKey("X-Using-Proxy")
headerValueSetInProxy := "true"
exp, coll := factoryFunc("", nil, WithTransport(&http.Transport{
Proxy: func(r *http.Request) (*url.URL, error) {
r.Header.Set(headerKeySetInProxy, headerValueSetInProxy)
return r.URL, nil
},
}))
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, coll.Shutdown(ctx)) })
require.NoError(t, exp.Export(ctx, make([]log.Record, 1)))
// Ensure everything is flushed.
require.NoError(t, exp.Shutdown(ctx))

got := coll.Headers()
require.Contains(t, got, headerKeySetInProxy)
assert.Equal(t, []string{headerValueSetInProxy}, got[headerKeySetInProxy])
})

t.Run("non-retryable errors are propagated", func(t *testing.T) {
exporterErr := errors.New("missing required attribute aaaa")
rCh := make(chan exportResult, 1)
Expand Down
20 changes: 20 additions & 0 deletions exporters/otlp/otlplog/otlploghttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type config struct {
timeout setting[time.Duration]
proxy setting[HTTPTransportProxyFunc]
retryCfg setting[retry.Config]
transport setting[http.RoundTripper]
}

func newConfig(options []Option) config {
Expand Down Expand Up @@ -134,6 +135,9 @@ func newConfig(options []Option) config {
c.retryCfg = c.retryCfg.Resolve(
fallback[retry.Config](defaultRetryCfg),
)
c.transport = c.transport.Resolve(
fallback[http.RoundTripper](ourTransport),
)

return c
}
Expand Down Expand Up @@ -344,6 +348,22 @@ func WithProxy(pf HTTPTransportProxyFunc) Option {
})
}

// WithTransport sets the HTTP transport to use by the exporter's HTTP client.
//
// This option will take precedence over [WithProxy], [WithTLSClientConfig] options
// as well as OTEL_EXPORTER_OTLP_CERTIFICATE and OTEL_EXPORTER_OTLP_LOGS_CERTIFICATE
// environment variables.
//
// Be aware that passing an transport like
// [go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp.NewTransport] can
// cause the client to be instrumented twice and cause infinite recursion.
func WithTransport(r http.RoundTripper) Option {
return fnOpt(func(c config) config {
c.transport = newSetting(r)
return c
})
}

// setting is a configuration setting value.
type setting[T any] struct {
Value T
Expand Down
103 changes: 60 additions & 43 deletions exporters/otlp/otlplog/otlploghttp/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ func TestNewConfig(t *testing.T) {
{
name: "Defaults",
want: config{
endpoint: newSetting(defaultEndpoint),
path: newSetting(defaultPath),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
endpoint: newSetting(defaultEndpoint),
path: newSetting(defaultPath),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand All @@ -109,6 +110,7 @@ func TestNewConfig(t *testing.T) {
WithHeaders(headers),
WithTimeout(time.Second),
WithRetry(RetryConfig(rc)),
WithTransport(http.DefaultTransport),
// Do not test WithProxy. Requires func comparison.
},
want: config{
Expand All @@ -120,6 +122,7 @@ func TestNewConfig(t *testing.T) {
compression: newSetting(GzipCompression),
timeout: newSetting(time.Second),
retryCfg: newSetting(rc),
transport: newSetting(http.DefaultTransport),
},
},
{
Expand All @@ -128,11 +131,12 @@ func TestNewConfig(t *testing.T) {
WithEndpointURL("http://test:8080/path"),
},
want: config{
endpoint: newSetting("test:8080"),
path: newSetting("/path"),
insecure: newSetting(true),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
endpoint: newSetting("test:8080"),
path: newSetting("/path"),
insecure: newSetting(true),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand All @@ -144,11 +148,12 @@ func TestNewConfig(t *testing.T) {
WithInsecure(),
},
want: config{
endpoint: newSetting("not-test:9090"),
path: newSetting("/alt"),
insecure: newSetting(true),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
endpoint: newSetting("not-test:9090"),
path: newSetting("/alt"),
insecure: newSetting(true),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand All @@ -160,11 +165,12 @@ func TestNewConfig(t *testing.T) {
WithEndpointURL("https://test:8080/path"),
},
want: config{
endpoint: newSetting("test:8080"),
path: newSetting("/path"),
insecure: newSetting(false),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
endpoint: newSetting("test:8080"),
path: newSetting("/path"),
insecure: newSetting(false),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand All @@ -176,11 +182,12 @@ func TestNewConfig(t *testing.T) {
WithEndpointURL("https://test:8080/path"),
},
want: config{
endpoint: newSetting("test:8080"),
path: newSetting("/path"),
insecure: newSetting(false),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
endpoint: newSetting("test:8080"),
path: newSetting("/path"),
insecure: newSetting(false),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand All @@ -192,11 +199,12 @@ func TestNewConfig(t *testing.T) {
WithEndpointURL("https://test:8080/path"),
},
want: config{
endpoint: newSetting("test:8080"),
path: newSetting("/path"),
insecure: newSetting(false),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
endpoint: newSetting("test:8080"),
path: newSetting("/path"),
insecure: newSetting(false),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand All @@ -219,6 +227,7 @@ func TestNewConfig(t *testing.T) {
compression: newSetting(GzipCompression),
timeout: newSetting(15 * time.Second),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand All @@ -227,11 +236,12 @@ func TestNewConfig(t *testing.T) {
"OTEL_EXPORTER_OTLP_LOGS_ENDPOINT": "http://env.endpoint",
},
want: config{
endpoint: newSetting("env.endpoint"),
path: newSetting("/"),
insecure: newSetting(true),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
endpoint: newSetting("env.endpoint"),
path: newSetting("/"),
insecure: newSetting(true),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand All @@ -254,6 +264,7 @@ func TestNewConfig(t *testing.T) {
compression: newSetting(NoCompression),
timeout: newSetting(15 * time.Second),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand All @@ -262,11 +273,12 @@ func TestNewConfig(t *testing.T) {
"OTEL_EXPORTER_OTLP_ENDPOINT": "http://env.endpoint",
},
want: config{
endpoint: newSetting("env.endpoint"),
path: newSetting(defaultPath),
insecure: newSetting(true),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
endpoint: newSetting("env.endpoint"),
path: newSetting(defaultPath),
insecure: newSetting(true),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand Down Expand Up @@ -297,6 +309,7 @@ func TestNewConfig(t *testing.T) {
compression: newSetting(GzipCompression),
timeout: newSetting(15 * time.Second),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand Down Expand Up @@ -338,6 +351,7 @@ func TestNewConfig(t *testing.T) {
compression: newSetting(GzipCompression),
timeout: newSetting(time.Second),
retryCfg: newSetting(rc),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand All @@ -352,10 +366,11 @@ func TestNewConfig(t *testing.T) {
"OTEL_EXPORTER_OTLP_LOGS_CLIENT_KEY": "invalid_key",
},
want: config{
endpoint: newSetting(defaultEndpoint),
path: newSetting(defaultPath),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
endpoint: newSetting(defaultEndpoint),
path: newSetting(defaultPath),
timeout: newSetting(defaultTimeout),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
errs: []string{
`invalid OTEL_EXPORTER_OTLP_LOGS_ENDPOINT value %invalid: parse "%invalid": invalid URL escape "%in"`,
Expand Down Expand Up @@ -387,6 +402,7 @@ func TestNewConfig(t *testing.T) {
compression: newSetting(GzipCompression),
timeout: newSetting(15 * time.Second),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
{
Expand All @@ -408,6 +424,7 @@ func TestNewConfig(t *testing.T) {
compression: newSetting(GzipCompression),
timeout: newSetting(15 * time.Second),
retryCfg: newSetting(defaultRetryCfg),
transport: newSetting(http.RoundTripper(ourTransport)),
},
},
}
Expand Down
Loading