diff --git a/.chloggen/elasticsearchexporter_retry-on-status.yaml b/.chloggen/elasticsearchexporter_retry-on-status.yaml new file mode 100644 index 0000000000000..0d32f03d6d1e0 --- /dev/null +++ b/.chloggen/elasticsearchexporter_retry-on-status.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add retry.retry_on_status config + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [32584] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Previously, the status codes that trigger retries were hardcoded to be 429, 500, 502, 503, 504. + It is now configurable using `retry.retry_on_status`, and defaults to `[429, 500, 502, 503, 504]` to avoid a breaking change. + To avoid duplicates, it is recommended to configure `retry.retry_on_status` to `[429]`, which would be the default in a future version. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index 52490b1867e03..6567f801e6757 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -58,6 +58,7 @@ This exporter supports sending OpenTelemetry logs and traces to [Elasticsearch]( - `max_requests` (default=3): Number of HTTP request retries. - `initial_interval` (default=100ms): Initial waiting time if a HTTP request failed. - `max_interval` (default=1m): Max waiting time if a HTTP request failed. + - `retry_on_status` (default=[429, 500, 502, 503, 504]): Status codes that trigger request or document level retries. Request level retry and document level retry status codes are shared and cannot be configured separately. To avoid duplicates, it is recommended to set it to `[429]`. WARNING: The default will be changed to `[429]` in the future. - `mapping`: Events are encoded to JSON. The `mapping` allows users to configure additional mapping rules. - `mode` (default=none): The fields naming mode. valid modes are: diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index 63dfe75dde896..fd7d931459438 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -151,6 +151,9 @@ type RetrySettings struct { // MaxInterval configures the max waiting time if consecutive requests failed. MaxInterval time.Duration `mapstructure:"max_interval"` + + // RetryOnStatus configures the status codes that trigger request or document level retries. + RetryOnStatus []int `mapstructure:"retry_on_status"` } type MappingsSettings struct { diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 265d199638de9..59c6a290692ff 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -4,6 +4,7 @@ package elasticsearchexporter import ( + "net/http" "path/filepath" "testing" "time" @@ -61,6 +62,13 @@ func TestLoad_DeprecatedIndexConfigOption(t *testing.T) { MaxRequests: 5, InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, + RetryOnStatus: []int{ + http.StatusTooManyRequests, + http.StatusInternalServerError, + http.StatusBadGateway, + http.StatusServiceUnavailable, + http.StatusGatewayTimeout, + }, }, Mapping: MappingsSettings{ Mode: "none", @@ -136,6 +144,7 @@ func TestLoadConfig(t *testing.T) { MaxRequests: 5, InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, + RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError}, }, Mapping: MappingsSettings{ Mode: "none", @@ -186,6 +195,7 @@ func TestLoadConfig(t *testing.T) { MaxRequests: 5, InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, + RetryOnStatus: []int{http.StatusTooManyRequests, http.StatusInternalServerError}, }, Mapping: MappingsSettings{ Mode: "none", diff --git a/exporter/elasticsearchexporter/elasticsearch_bulk.go b/exporter/elasticsearchexporter/elasticsearch_bulk.go index 4b4c407f9fa02..150ca0f92aa9c 100644 --- a/exporter/elasticsearchexporter/elasticsearch_bulk.go +++ b/exporter/elasticsearchexporter/elasticsearch_bulk.go @@ -103,7 +103,7 @@ func newElasticsearchClient(logger *zap.Logger, config *Config) (*esClientCurren Header: headers, // configure retry behavior - RetryOnStatus: retryOnStatus, + RetryOnStatus: config.Retry.RetryOnStatus, DisableRetry: retryDisabled, EnableRetryOnTimeout: config.Retry.Enabled, //RetryOnError: retryOnError, // should be used from esclient version 8 onwards @@ -175,7 +175,7 @@ func createElasticsearchBackoffFunc(config *RetrySettings) func(int) time.Durati } } -func shouldRetryEvent(status int) bool { +func shouldRetryEvent(status int, retryOnStatus []int) bool { for _, retryable := range retryOnStatus { if status == retryable { return true @@ -184,7 +184,7 @@ func shouldRetryEvent(status int) bool { return false } -func pushDocuments(ctx context.Context, logger *zap.Logger, index string, document []byte, bulkIndexer esBulkIndexerCurrent, maxAttempts int) error { +func pushDocuments(ctx context.Context, logger *zap.Logger, index string, document []byte, bulkIndexer esBulkIndexerCurrent, maxAttempts int, retryOnStatus []int) error { attempts := 1 body := bytes.NewReader(document) item := esBulkIndexerItem{Action: createAction, Index: index, Body: body} @@ -192,7 +192,7 @@ func pushDocuments(ctx context.Context, logger *zap.Logger, index string, docume // selective ACKing in the bulk response. item.OnFailure = func(ctx context.Context, item esBulkIndexerItem, resp esBulkIndexerResponseItem, err error) { switch { - case attempts < maxAttempts && shouldRetryEvent(resp.Status): + case attempts < maxAttempts && shouldRetryEvent(resp.Status, retryOnStatus): logger.Debug("Retrying to index", zap.String("name", index), zap.Int("attempt", attempts), diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 44bc0e0a29a67..f50a8e614ecd9 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -8,6 +8,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry import ( "context" "fmt" + "net/http" "runtime" "time" @@ -51,6 +52,13 @@ func createDefaultConfig() component.Config { MaxRequests: 3, InitialInterval: 100 * time.Millisecond, MaxInterval: 1 * time.Minute, + RetryOnStatus: []int{ + http.StatusTooManyRequests, + http.StatusInternalServerError, + http.StatusBadGateway, + http.StatusServiceUnavailable, + http.StatusGatewayTimeout, + }, }, Mapping: MappingsSettings{ Mode: "none", diff --git a/exporter/elasticsearchexporter/integrationtest/datareceiver.go b/exporter/elasticsearchexporter/integrationtest/datareceiver.go index 9dd9c99f83756..2587366738f04 100644 --- a/exporter/elasticsearchexporter/integrationtest/datareceiver.go +++ b/exporter/elasticsearchexporter/integrationtest/datareceiver.go @@ -137,7 +137,7 @@ func newMockESReceiver(params receiver.CreateSettings, cfg *config, next consume if err := next.ConsumeLogs(context.Background(), logs); err != nil { response.HasErrors = true - item.Status = http.StatusInternalServerError + item.Status = http.StatusTooManyRequests item.Error.Type = "simulated_es_error" item.Error.Reason = err.Error() } diff --git a/exporter/elasticsearchexporter/logs_exporter.go b/exporter/elasticsearchexporter/logs_exporter.go index dfc7fd5eee9a2..372ff2934cb62 100644 --- a/exporter/elasticsearchexporter/logs_exporter.go +++ b/exporter/elasticsearchexporter/logs_exporter.go @@ -23,14 +23,13 @@ type elasticsearchLogsExporter struct { logstashFormat LogstashFormatSettings dynamicIndex bool maxAttempts int + retryOnStatus []int client *esClientCurrent bulkIndexer esBulkIndexerCurrent model mappingModel } -var retryOnStatus = []int{500, 502, 503, 504, 429} - const createAction = "create" func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporter, error) { @@ -71,6 +70,7 @@ func newLogsExporter(logger *zap.Logger, cfg *Config) (*elasticsearchLogsExporte index: indexStr, dynamicIndex: cfg.LogsDynamicIndex.Enabled, maxAttempts: maxAttempts, + retryOnStatus: cfg.Retry.RetryOnStatus, model: model, logstashFormat: cfg.LogstashFormat, } @@ -129,5 +129,5 @@ func (e *elasticsearchLogsExporter) pushLogRecord(ctx context.Context, resource if err != nil { return fmt.Errorf("Failed to encode log event: %w", err) } - return pushDocuments(ctx, e.logger, fIndex, document, e.bulkIndexer, e.maxAttempts) + return pushDocuments(ctx, e.logger, fIndex, document, e.bulkIndexer, e.maxAttempts, e.retryOnStatus) } diff --git a/exporter/elasticsearchexporter/logs_exporter_test.go b/exporter/elasticsearchexporter/logs_exporter_test.go index 2cf90b9b75c7d..b9b3ccfcd1245 100644 --- a/exporter/elasticsearchexporter/logs_exporter_test.go +++ b/exporter/elasticsearchexporter/logs_exporter_test.go @@ -328,7 +328,7 @@ func TestExporter_PushEvent(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { if failures == 0 { failures++ - return nil, &httpTestError{message: "oops"} + return nil, &httpTestError{status: http.StatusTooManyRequests, message: "oops"} } rec.Record(docs) @@ -510,7 +510,7 @@ func withTestExporterConfig(fns ...func(*Config)) func(string) *Config { } func mustSend(t *testing.T, exporter *elasticsearchLogsExporter, contents string) { - err := pushDocuments(context.TODO(), zap.L(), exporter.index, []byte(contents), exporter.bulkIndexer, exporter.maxAttempts) + err := pushDocuments(context.TODO(), zap.L(), exporter.index, []byte(contents), exporter.bulkIndexer, exporter.maxAttempts, exporter.retryOnStatus) require.NoError(t, err) } diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index 3054df30355c2..ba323702ea431 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -19,6 +19,9 @@ elasticsearch/trace: bytes: 10485760 retry: max_requests: 5 + retry_on_status: + - 429 + - 500 elasticsearch/log: tls: insecure: false @@ -38,6 +41,9 @@ elasticsearch/log: bytes: 10485760 retry: max_requests: 5 + retry_on_status: + - 429 + - 500 sending_queue: enabled: true elasticsearch/logstash_format: diff --git a/exporter/elasticsearchexporter/trace_exporter.go b/exporter/elasticsearchexporter/trace_exporter.go index 4f313f0d5546a..7153132b4975e 100644 --- a/exporter/elasticsearchexporter/trace_exporter.go +++ b/exporter/elasticsearchexporter/trace_exporter.go @@ -23,6 +23,7 @@ type elasticsearchTracesExporter struct { logstashFormat LogstashFormatSettings dynamicIndex bool maxAttempts int + retryOnStatus []int client *esClientCurrent bulkIndexer esBulkIndexerCurrent @@ -63,6 +64,7 @@ func newTracesExporter(logger *zap.Logger, cfg *Config) (*elasticsearchTracesExp index: cfg.TracesIndex, dynamicIndex: cfg.TracesDynamicIndex.Enabled, maxAttempts: maxAttempts, + retryOnStatus: cfg.Retry.RetryOnStatus, model: model, logstashFormat: cfg.LogstashFormat, }, nil @@ -122,5 +124,5 @@ func (e *elasticsearchTracesExporter) pushTraceRecord(ctx context.Context, resou if err != nil { return fmt.Errorf("Failed to encode trace record: %w", err) } - return pushDocuments(ctx, e.logger, fIndex, document, e.bulkIndexer, e.maxAttempts) + return pushDocuments(ctx, e.logger, fIndex, document, e.bulkIndexer, e.maxAttempts, e.retryOnStatus) } diff --git a/exporter/elasticsearchexporter/traces_exporter_test.go b/exporter/elasticsearchexporter/traces_exporter_test.go index beffeaac77a61..57dd1cc415747 100644 --- a/exporter/elasticsearchexporter/traces_exporter_test.go +++ b/exporter/elasticsearchexporter/traces_exporter_test.go @@ -272,7 +272,7 @@ func TestExporter_PushTraceRecord(t *testing.T) { server := newESTestServer(t, func(docs []itemRequest) ([]itemResponse, error) { if failures == 0 { failures++ - return nil, &httpTestError{message: "oops"} + return nil, &httpTestError{status: http.StatusTooManyRequests, message: "oops"} } rec.Record(docs) @@ -463,7 +463,7 @@ func withTestTracesExporterConfig(fns ...func(*Config)) func(string) *Config { } func mustSendTraces(t *testing.T, exporter *elasticsearchTracesExporter, contents string) { - err := pushDocuments(context.TODO(), zap.L(), exporter.index, []byte(contents), exporter.bulkIndexer, exporter.maxAttempts) + err := pushDocuments(context.TODO(), zap.L(), exporter.index, []byte(contents), exporter.bulkIndexer, exporter.maxAttempts, exporter.retryOnStatus) require.NoError(t, err) }