Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions exporter/elasticsearchexporter/attribute.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package elasticsearchexporter contains an opentelemetry-collector exporter
// for Elasticsearch.
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import "go.opentelemetry.io/collector/pdata/pcommon"
Expand Down
37 changes: 34 additions & 3 deletions exporter/elasticsearchexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
"encoding/base64"
"errors"
"fmt"
"net/url"
"os"
"strings"
"time"
Expand Down Expand Up @@ -182,8 +184,9 @@ const (
)

var (
errConfigNoEndpoint = errors.New("endpoints or cloudid must be specified")
errConfigEmptyEndpoint = errors.New("endpoints must not include empty entries")
errConfigNoEndpoint = errors.New("endpoints or cloudid must be specified")
errConfigEmptyEndpoint = errors.New("endpoints must not include empty entries")
errConfigCloudIDMutuallyExclusive = errors.New("only one of endpoints or cloudid may be specified")
)

func (m MappingMode) String() string {
Expand Down Expand Up @@ -226,19 +229,47 @@ func (cfg *Config) Validate() error {
}
}

if cfg.CloudID != "" {
if len(cfg.Endpoints) > 0 {
return errConfigCloudIDMutuallyExclusive
}
if _, err := parseCloudID(cfg.CloudID); err != nil {
return err
}
}

for _, endpoint := range cfg.Endpoints {
if endpoint == "" {
return errConfigEmptyEndpoint
}
}

if _, ok := mappingModes[cfg.Mapping.Mode]; !ok {
return fmt.Errorf("unknown mapping mode %v", cfg.Mapping.Mode)
return fmt.Errorf("unknown mapping mode %q", cfg.Mapping.Mode)
}

return nil
}

// Based on "addrFromCloudID" in go-elasticsearch.
func parseCloudID(input string) (*url.URL, error) {
_, after, ok := strings.Cut(input, ":")
if !ok {
return nil, fmt.Errorf("invalid CloudID %q", input)
}

decoded, err := base64.StdEncoding.DecodeString(after)
if err != nil {
return nil, err
}

before, after, ok := strings.Cut(string(decoded), "$")
if !ok {
return nil, fmt.Errorf("invalid decoded CloudID %q", string(decoded))
}
return url.Parse(fmt.Sprintf("https://%s.%s", after, before))
}

// MappingMode returns the mapping.mode defined in the given cfg
// object. This method must be called after cfg.Validate() has been
// called without returning an error.
Expand Down
145 changes: 77 additions & 68 deletions exporter/elasticsearchexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,72 +18,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter/internal/metadata"
)

func TestLoad_DeprecatedIndexConfigOption(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config-use-deprecated-index_option.yaml"))
require.NoError(t, err)
factory := NewFactory()
cfg := factory.CreateDefaultConfig()

sub, err := cm.Sub(component.NewIDWithName(metadata.Type, "log").String())
require.NoError(t, err)
require.NoError(t, component.UnmarshalConfig(sub, cfg))

assert.Equal(t, cfg, &Config{
QueueSettings: exporterhelper.QueueSettings{
Enabled: false,
NumConsumers: exporterhelper.NewDefaultQueueSettings().NumConsumers,
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
CloudID: "TRNMxjXlNJEt",
Index: "my_log_index",
LogsIndex: "logs-generic-default",
TracesIndex: "traces-generic-default",
Pipeline: "mypipeline",
ClientConfig: ClientConfig{
Authentication: AuthenticationSettings{
User: "elastic",
Password: "search",
APIKey: "AvFsEiPs==",
},
Timeout: 2 * time.Minute,
Headers: map[string]string{
"myheader": "test",
},
},
Discovery: DiscoverySettings{
OnStart: true,
},
Flush: FlushSettings{
Bytes: 10485760,
},
Retry: RetrySettings{
Enabled: true,
MaxRequests: 5,
InitialInterval: 100 * time.Millisecond,
MaxInterval: 1 * time.Minute,
RetryOnStatus: []int{
http.StatusTooManyRequests,
http.StatusInternalServerError,
http.StatusBadGateway,
http.StatusServiceUnavailable,
http.StatusGatewayTimeout,
},
},
Mapping: MappingsSettings{
Mode: "none",
Dedup: true,
Dedot: true,
},
LogstashFormat: LogstashFormatSettings{
Enabled: false,
PrefixSeparator: "-",
DateFormat: "%Y.%m.%d",
},
})
}

func TestLoadConfig(t *testing.T) {
func TestConfig(t *testing.T) {
t.Parallel()

defaultCfg := createDefaultConfig()
Expand Down Expand Up @@ -117,7 +52,6 @@ func TestLoadConfig(t *testing.T) {
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"https://elastic.example.com:9200"},
CloudID: "TRNMxjXlNJEt",
Index: "",
LogsIndex: "logs-generic-default",
TracesIndex: "trace_index",
Expand Down Expand Up @@ -168,7 +102,6 @@ func TestLoadConfig(t *testing.T) {
QueueSize: exporterhelper.NewDefaultQueueSettings().QueueSize,
},
Endpoints: []string{"http://localhost:9200"},
CloudID: "TRNMxjXlNJEt",
Index: "",
LogsIndex: "my_log_index",
TracesIndex: "traces-generic-default",
Expand Down Expand Up @@ -219,6 +152,21 @@ func TestLoadConfig(t *testing.T) {
configFile: "config.yaml",
expected: defaultRawCfg,
},
{
id: component.NewIDWithName(metadata.Type, "cloudid"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.CloudID = "foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY="
}),
},
{
id: component.NewIDWithName(metadata.Type, "deprecated_index"),
configFile: "config.yaml",
expected: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"https://elastic.example.com:9200"}
cfg.Index = "my_log_index"
}),
},
}

for _, tt := range tests {
Expand All @@ -239,6 +187,67 @@ func TestLoadConfig(t *testing.T) {
}
}

// TestConfig_Validate tests the error cases of Config.Validate.
//
// Successful validation should be covered by TestConfig above.
func TestConfig_Validate(t *testing.T) {
tests := map[string]struct {
config *Config
err string
}{
"no endpoints": {
config: withDefaultConfig(),
err: "endpoints or cloudid must be specified",
},
"empty endpoint": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{""}
}),
err: "endpoints must not include empty entries",
},
"invalid cloudid": {
config: withDefaultConfig(func(cfg *Config) {
cfg.CloudID = "invalid"
}),
err: `invalid CloudID "invalid"`,
},
"invalid decoded cloudid": {
config: withDefaultConfig(func(cfg *Config) {
cfg.CloudID = "foo:YWJj"
}),
err: `invalid decoded CloudID "abc"`,
},
"endpoint and cloudid both set": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"test:9200"}
cfg.CloudID = "foo:YmFyLmNsb3VkLmVzLmlvJGFiYzEyMyRkZWY0NTY="
}),
err: "only one of endpoints or cloudid may be specified",
},
"invalid mapping mode": {
config: withDefaultConfig(func(cfg *Config) {
cfg.Endpoints = []string{"test:9200"}
cfg.Mapping.Mode = "invalid"
}),
err: `unknown mapping mode "invalid"`,
},
}

for name, tt := range tests {
t.Run(name, func(t *testing.T) {
err := tt.config.Validate()
assert.EqualError(t, err, tt.err)
})
}
}

func TestConfig_Validate_Environment(t *testing.T) {
t.Setenv("ELASTICSEARCH_URL", "test:9200")
config := withDefaultConfig()
err := config.Validate()
require.NoError(t, err)
}

func withDefaultConfig(fns ...func(*Config)) *Config {
cfg := createDefaultConfig().(*Config)
for _, fn := range fns {
Expand Down
6 changes: 6 additions & 0 deletions exporter/elasticsearchexporter/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package elasticsearchexporter contains an opentelemetry-collector exporter
// for Elasticsearch.
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"
2 changes: 0 additions & 2 deletions exporter/elasticsearchexporter/elasticsearch_bulk.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package elasticsearchexporter contains an opentelemetry-collector exporter
// for Elasticsearch.
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

// Package elasticsearchexporter contains an opentelemetry-collector exporter
// for Elasticsearch.
package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter"

import (
Expand All @@ -12,11 +10,12 @@ import (
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"
)

type elasticsearchTracesExporter struct {
type elasticsearchExporter struct {
logger *zap.Logger

index string
Expand All @@ -28,7 +27,7 @@ type elasticsearchTracesExporter struct {
model mappingModel
}

func newTracesExporter(logger *zap.Logger, cfg *Config) (*elasticsearchTracesExporter, error) {
func newExporter(logger *zap.Logger, cfg *Config, index string, dynamicIndex bool) (*elasticsearchExporter, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Non blocking/discussion] I was thinking of refactoring this a bit more (possibly in the future) to use the same instance of the exporter (and thus the bulk indexer) for both logs and traces (and eventually metrics) - IMO, this will allow us to better reason about resource consumption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea, but I'm unsure how one would go about it. Is there any prior art? Probably worth opening an issue so this idea and any discussion doesn't get lost.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is SharedComponent however we can't probably use it as is.

Probably worth opening an issue so this idea and any discussion doesn't get lost.

Sounds good, I will create an issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created an issue: #33326

I have a few ideas on how this could look like. Will create a followup PR for discussions after this PR is merged.

if err := cfg.Validate(); err != nil {
return nil, err
}
Expand All @@ -49,23 +48,74 @@ func newTracesExporter(logger *zap.Logger, cfg *Config) (*elasticsearchTracesExp
mode: cfg.MappingMode(),
}

return &elasticsearchTracesExporter{
return &elasticsearchExporter{
logger: logger,
client: client,
bulkIndexer: bulkIndexer,

index: cfg.TracesIndex,
dynamicIndex: cfg.TracesDynamicIndex.Enabled,
index: index,
dynamicIndex: dynamicIndex,
model: model,
logstashFormat: cfg.LogstashFormat,
}, nil
}

func (e *elasticsearchTracesExporter) Shutdown(ctx context.Context) error {
func (e *elasticsearchExporter) Shutdown(ctx context.Context) error {
return e.bulkIndexer.Close(ctx)
}

func (e *elasticsearchTracesExporter) pushTraceData(
func (e *elasticsearchExporter) pushLogsData(ctx context.Context, ld plog.Logs) error {
var errs []error

rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
rl := rls.At(i)
resource := rl.Resource()
ills := rl.ScopeLogs()
for j := 0; j < ills.Len(); j++ {
ill := ills.At(j)
scope := ill.Scope()
logs := ill.LogRecords()
for k := 0; k < logs.Len(); k++ {
if err := e.pushLogRecord(ctx, resource, logs.At(k), scope); err != nil {
if cerr := ctx.Err(); cerr != nil {
return cerr
}

errs = append(errs, err)
}
}
}
}

return errors.Join(errs...)
}

func (e *elasticsearchExporter) pushLogRecord(ctx context.Context, resource pcommon.Resource, record plog.LogRecord, scope pcommon.InstrumentationScope) error {
fIndex := e.index
if e.dynamicIndex {
prefix := getFromAttributes(indexPrefix, resource, scope, record)
suffix := getFromAttributes(indexSuffix, resource, scope, record)

fIndex = fmt.Sprintf("%s%s%s", prefix, fIndex, suffix)
}

if e.logstashFormat.Enabled {
formattedIndex, err := generateIndexWithLogstashFormat(fIndex, &e.logstashFormat, time.Now())
if err != nil {
return err
}
fIndex = formattedIndex
}

document, err := e.model.encodeLog(resource, record, scope)
if err != nil {
return fmt.Errorf("Failed to encode log event: %w", err)
}
return pushDocuments(ctx, fIndex, document, e.bulkIndexer)
}

func (e *elasticsearchExporter) pushTraceData(
ctx context.Context,
td ptrace.Traces,
) error {
Expand Down Expand Up @@ -94,7 +144,7 @@ func (e *elasticsearchTracesExporter) pushTraceData(
return errors.Join(errs...)
}

func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error {
func (e *elasticsearchExporter) pushTraceRecord(ctx context.Context, resource pcommon.Resource, span ptrace.Span, scope pcommon.InstrumentationScope) error {
fIndex := e.index
if e.dynamicIndex {
prefix := getFromAttributes(indexPrefix, resource, scope, span)
Expand Down
Loading