Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
-
### 💡 Enhancements 💡

- Make the in-memory and persistent queues more consistent (#5764)ś
- `ocb` now exits with an error if it fails to load the build configuration. (#5731)
- Deprecate `HTTPClientSettings.ToClientWithHost` (#5737)

Expand Down
14 changes: 11 additions & 3 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe

import (
"context"
"fmt"
"sync"

"go.uber.org/zap"

"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/extension/experimental/storage"
)

Expand All @@ -36,12 +38,18 @@ type persistentQueue struct {
storage persistentStorage
}

// NewPersistentQueue creates a new queue backed by file storage; name parameter must be a unique value that identifies the queue
func NewPersistentQueue(ctx context.Context, name string, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
// buildPersistentStorageName returns a name that is constructed out of queue name and signal type. This is done
// to avoid conflicts between different signals, which require unique persistent storage name
func buildPersistentStorageName(name string, signal config.DataType) string {
return fmt.Sprintf("%s-%s", name, signal)
}

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(ctx context.Context, name string, signal config.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
return &persistentQueue{
logger: logger,
stopChan: make(chan struct{}),
storage: newPersistentContiguousStorage(ctx, name, uint64(capacity), logger, client, unmarshaler),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler),
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue
panic(err)
}

wq := NewPersistentQueue(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
wq := NewPersistentQueue(context.Background(), "foo", config.TracesDataType, capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
return wq.(*persistentQueue)
}

Expand Down
17 changes: 6 additions & 11 deletions exporter/exporterhelper/queued_retry_experimental.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ var (
)

type queuedRetrySender struct {
fullName string
id config.ComponentID
signal config.DataType
cfg QueueSettings
Expand All @@ -93,19 +94,13 @@ type queuedRetrySender struct {
requestUnmarshaler internal.RequestUnmarshaler
}

func (qrs *queuedRetrySender) fullName() string {
if qrs.signal == "" {
return qrs.id.String()
}
return fmt.Sprintf("%s-%s", qrs.id.String(), qrs.signal)
}

func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg QueueSettings, rCfg RetrySettings, reqUnmarshaler internal.RequestUnmarshaler, nextSender requestSender, logger *zap.Logger) *queuedRetrySender {
retryStopCh := make(chan struct{})
sampledLogger := createSampledLogger(logger)
traceAttr := attribute.String(obsmetrics.ExporterKey, id.String())

qrs := &queuedRetrySender{
fullName: id.String(),
id: id,
signal: signal,
cfg: qCfg,
Expand Down Expand Up @@ -164,7 +159,7 @@ func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, hos
return err
}

qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName(), qrs.cfg.QueueSize, qrs.logger, *storageClient, qrs.requestUnmarshaler)
qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, *storageClient, qrs.requestUnmarshaler)

// TODO: this can be further exposed as a config param rather than relying on a type of queue
qrs.requeuingEnabled = true
Expand Down Expand Up @@ -215,13 +210,13 @@ func (qrs *queuedRetrySender) start(ctx context.Context, host component.Host) er
if qrs.cfg.Enabled {
err := globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(qrs.queue.Size())
}, metricdata.NewLabelValue(qrs.fullName()))
}, metricdata.NewLabelValue(qrs.fullName))
if err != nil {
return fmt.Errorf("failed to create retry queue size metric: %w", err)
}
err = globalInstruments.queueCapacity.UpsertEntry(func() int64 {
return int64(qrs.cfg.QueueSize)
}, metricdata.NewLabelValue(qrs.fullName()))
}, metricdata.NewLabelValue(qrs.fullName))
if err != nil {
return fmt.Errorf("failed to create retry queue capacity metric: %w", err)
}
Expand All @@ -236,7 +231,7 @@ func (qrs *queuedRetrySender) shutdown() {
if qrs.cfg.Enabled {
_ = globalInstruments.queueSize.UpsertEntry(func() int64 {
return int64(0)
}, metricdata.NewLabelValue(qrs.fullName()))
}, metricdata.NewLabelValue(qrs.fullName))
}

// First Stop the retry goroutines, so that unblocks the queue numWorkers.
Expand Down
217 changes: 217 additions & 0 deletions exporter/exporterhelper/queued_retry_experimental_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build enable_unstable
// +build enable_unstable

package exporterhelper

import (
"context"
"errors"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/extension/experimental/storage"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
)

type mockHost struct {
component.Host
ext map[config.ComponentID]component.Extension
}

func (nh *mockHost) GetExtensions() map[config.ComponentID]component.Extension {
return nh.ext
}

type mockStorageExtension struct {
GetClientError error
}

func (mse *mockStorageExtension) Start(_ context.Context, _ component.Host) error {
return nil
}

func (mse *mockStorageExtension) Shutdown(_ context.Context) error {
return nil
}

func (mse *mockStorageExtension) GetClient(_ context.Context, _ component.Kind, _ config.ComponentID, _ string) (storage.Client, error) {
if mse.GetClientError != nil {
return nil, mse.GetClientError
}
return storage.NewNopClient(), nil
}

func TestGetRetrySettings(t *testing.T) {
getStorageClientError := errors.New("unable to create storage client")
testCases := []struct {
desc string
storage storage.Extension
numStorages int
storageEnabled bool
expectedError error
getClientError error
}{
{
desc: "no storage selected",
numStorages: 0,
expectedError: errNoStorageClient,
},
{
desc: "obtain default storage extension",
numStorages: 1,
storageEnabled: true,
expectedError: nil,
},
{
desc: "fail on obtaining default storage extension",
numStorages: 2,
storageEnabled: true,
expectedError: errMultipleStorageClients,
},
{
desc: "fail on error getting storage client from extension",
numStorages: 1,
storageEnabled: true,
expectedError: getStorageClientError,
getClientError: getStorageClientError,
},
}

for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
// prepare
var extensions = map[config.ComponentID]component.Extension{}
for i := 0; i < tC.numStorages; i++ {
extensions[config.NewComponentIDWithName("file_storage", strconv.Itoa(i))] = &mockStorageExtension{GetClientError: tC.getClientError}
}
host := &mockHost{ext: extensions}
ownerID := config.NewComponentID("foo_exporter")

// execute
client, err := getStorageClient(context.Background(), host, ownerID, config.TracesDataType)

// verify
if tC.expectedError != nil {
assert.ErrorIs(t, err, tC.expectedError)
assert.Nil(t, client)
} else {
assert.NoError(t, err)
assert.NotNil(t, client)
}
})
}
}

// if requeueing is enabled, we eventually retry even if we failed at first
func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 1
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
ocs := newObservabilityConsumerSender(be.qrSender.consumerSender)
be.qrSender.consumerSender = ocs
be.qrSender.requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1))
mockR := newMockRequest(context.Background(), 1, traceErr)
ocs.run(func() {
// This is asynchronous so it should just enqueue, no errors expected.
require.NoError(t, be.sender.send(mockR))
ocs.waitGroup.Add(1) // necessary because we'll call send() again after requeueing
})
ocs.awaitAsyncProcessing()

// In the newMockConcurrentExporter we count requests and items even for failed requests
mockR.checkNumRequests(t, 2)
ocs.checkSendItemsCount(t, 1)
ocs.checkDroppedItemsCount(t, 1) // not actually dropped, but ocs counts each failed send here
}

// if requeueing is enabled, but the queue is full, we get an error
func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) {
qCfg := NewDefaultQueueSettings()
qCfg.NumConsumers = 0
qCfg.QueueSize = 0
rCfg := NewDefaultRetrySettings()
rCfg.MaxElapsedTime = time.Nanosecond // we don't want to retry at all, but requeue instead
be := newBaseExporter(&defaultExporterCfg, componenttest.NewNopExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
be.qrSender.requeuingEnabled = true
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
t.Cleanup(func() {
assert.NoError(t, be.Shutdown(context.Background()))
})

traceErr := consumererror.NewTraces(errors.New("some error"), testdata.GenerateTraces(1))
mockR := newMockRequest(context.Background(), 1, traceErr)

require.Error(t, be.qrSender.consumerSender.send(mockR), "sending_queue is full")
mockR.checkNumRequests(t, 1)
}

func TestQueuedRetryPersistenceEnabled(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

qCfg := NewDefaultQueueSettings()
qCfg.PersistentStorageEnabled = true // enable persistence
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())

var extensions = map[config.ComponentID]component.Extension{
config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{},
}
host := &mockHost{ext: extensions}

// we start correctly with a file storage extension
require.NoError(t, be.Start(context.Background(), host))
require.NoError(t, be.Shutdown(context.Background()))
}

func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
storageError := errors.New("could not get storage client")
tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

qCfg := NewDefaultQueueSettings()
qCfg.PersistentStorageEnabled = true // enable persistence
rCfg := NewDefaultRetrySettings()
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())

var extensions = map[config.ComponentID]component.Extension{
config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{GetClientError: storageError},
}
host := &mockHost{ext: extensions}

// we fail to start if we get an error creating the storage client
require.Error(t, be.Start(context.Background(), host), "could not get storage client")
}
Loading