diff --git a/CHANGELOG.md b/CHANGELOG.md index c4128e3e236..dbfec61bcce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ ### ๐Ÿ›‘ Breaking changes ๐Ÿ›‘ +- Require the storage to be explicitly set for the (experimental) persistent queue (#5784) - Remove deprecated `confighttp.HTTPClientSettings.ToClientWithHost` (#5803) - Remove deprecated component stability helpers (#5802): - `component.WithTracesExporterAndStabilityLevel` diff --git a/exporter/exporterhelper/README.md b/exporter/exporterhelper/README.md index 51bd4383096..5fbb9dd5731 100644 --- a/exporter/exporterhelper/README.md +++ b/exporter/exporterhelper/README.md @@ -36,16 +36,13 @@ The following configuration options can be modified: With this build tag set, additional configuration option can be enabled: - `sending_queue` - - `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension + - `storage` (default = none): When set, enables persistence and uses the component specified as a storage extension for the persistent queue (note, `enable_unstable` build tag needs to be enabled first, see below for more details) The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which, similarly as for in-memory buffering, defaults to 5000 batches). -When `persistent_storage_enabled` is set to true, the queue is being buffered to disk using -[file storage extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage). -If collector instance is killed while having some items in the persistent queue, on restart the items are being picked and -the exporting is continued. +When persistent queue is enabled, the batches are being buffered using the provided storage extension - [filestorage] is a popular and safe choice. If the collector instance is killed while having some items in the persistent queue, on restart the items will be be picked and the exporting is continued. ``` โ”Œโ”€Consumer #1โ”€โ” @@ -93,9 +90,9 @@ exporters: otlp: endpoint: sending_queue: - persistent_storage_enabled: true + storage: file_storage/otc extensions: - file_storage: + file_storage/otc: directory: /var/lib/storage/otc timeout: 10s service: @@ -112,3 +109,5 @@ service: exporters: [otlp] ``` + +[filestorage]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage diff --git a/exporter/exporterhelper/queued_retry_experimental.go b/exporter/exporterhelper/queued_retry_experimental.go index 983650b1305..773911c0396 100644 --- a/exporter/exporterhelper/queued_retry_experimental.go +++ b/exporter/exporterhelper/queued_retry_experimental.go @@ -44,8 +44,9 @@ type QueueSettings struct { NumConsumers int `mapstructure:"num_consumers"` // QueueSize is the maximum number of batches allowed in queue at a given time. QueueSize int `mapstructure:"queue_size"` - // PersistentStorageEnabled describes whether persistence via a file storage extension is enabled - PersistentStorageEnabled bool `mapstructure:"persistent_storage_enabled"` + // StorageID if not empty, enables the persistent storage and uses the component specified + // as a storage extension for the persistent queue + StorageID *config.ComponentID `mapstructure:"storage"` } // NewDefaultQueueSettings returns the default settings for QueueSettings. @@ -57,8 +58,7 @@ func NewDefaultQueueSettings() QueueSettings { // This is a pretty decent value for production. // User should calculate this from the perspective of how many seconds to buffer in case of a backend outage, // multiply that by the number of requests per seconds. - QueueSize: 5000, - PersistentStorageEnabled: false, + QueueSize: 5000, } } @@ -76,8 +76,8 @@ func (qCfg *QueueSettings) Validate() error { } var ( - errNoStorageClient = errors.New("no storage client extension found") - errMultipleStorageClients = errors.New("multiple storage extensions found") + errNoStorageClient = errors.New("no storage client extension found") + errWrongExtensionType = errors.New("requested extension is not a storage extension") ) type queuedRetrySender struct { @@ -120,7 +120,7 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu onTemporaryFailure: qrs.onTemporaryFailure, } - if !qCfg.PersistentStorageEnabled { + if qCfg.StorageID == nil { qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize, func(item interface{}) {}) } // The Persistent Queue is initialized separately as it needs extra information about the component @@ -128,38 +128,39 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu return qrs } -func getStorageClient(ctx context.Context, host component.Host, id config.ComponentID, signal config.DataType) (*storage.Client, error) { - var storageExtension storage.Extension - for _, ext := range host.GetExtensions() { - if se, ok := ext.(storage.Extension); ok { - if storageExtension != nil { - return nil, errMultipleStorageClients - } - storageExtension = se +func getStorageExtension(extensions map[config.ComponentID]component.Extension, storageID config.ComponentID) (storage.Extension, error) { + if ext, found := extensions[storageID]; found { + if storageExt, ok := ext.(storage.Extension); ok { + return storageExt, nil } + return nil, errWrongExtensionType } + return nil, errNoStorageClient +} - if storageExtension == nil { - return nil, errNoStorageClient +func toStorageClient(ctx context.Context, storageID config.ComponentID, host component.Host, ownerID config.ComponentID, signal config.DataType) (storage.Client, error) { + extension, err := getStorageExtension(host.GetExtensions(), storageID) + if err != nil { + return nil, err } - client, err := storageExtension.GetClient(ctx, component.KindExporter, id, string(signal)) + client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal)) if err != nil { return nil, err } - return &client, err + return client, err } // initializePersistentQueue uses extra information for initialization available from component.Host func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error { - if qrs.cfg.PersistentStorageEnabled { - storageClient, err := getStorageClient(ctx, host, qrs.id, qrs.signal) + if qrs.cfg.StorageID != nil { + storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal) if err != nil { return err } - qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, *storageClient, qrs.requestUnmarshaler) + qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler) // TODO: this can be further exposed as a config param rather than relying on a type of queue qrs.requeuingEnabled = true diff --git a/exporter/exporterhelper/queued_retry_experimental_test.go b/exporter/exporterhelper/queued_retry_experimental_test.go index f8c80083011..9deb641b34a 100644 --- a/exporter/exporterhelper/queued_retry_experimental_test.go +++ b/exporter/exporterhelper/queued_retry_experimental_test.go @@ -70,31 +70,32 @@ func TestGetRetrySettings(t *testing.T) { desc string storage storage.Extension numStorages int - storageEnabled bool + storageIndex int expectedError error getClientError error }{ { - desc: "no storage selected", - numStorages: 0, - expectedError: errNoStorageClient, + desc: "obtain storage extension by name", + numStorages: 2, + storageIndex: 0, + expectedError: nil, }, { - desc: "obtain default storage extension", - numStorages: 1, - storageEnabled: true, - expectedError: nil, + desc: "fail on not existing storage extension", + numStorages: 2, + storageIndex: 100, + expectedError: errNoStorageClient, }, { - desc: "fail on obtaining default storage extension", - numStorages: 2, - storageEnabled: true, - expectedError: errMultipleStorageClients, + desc: "invalid extension type", + numStorages: 2, + storageIndex: 100, + expectedError: errNoStorageClient, }, { desc: "fail on error getting storage client from extension", numStorages: 1, - storageEnabled: true, + storageIndex: 0, expectedError: getStorageClientError, getClientError: getStorageClientError, }, @@ -102,7 +103,8 @@ func TestGetRetrySettings(t *testing.T) { for _, tC := range testCases { t.Run(tC.desc, func(t *testing.T) { - // prepare + storageID := config.NewComponentIDWithName("file_storage", strconv.Itoa(tC.storageIndex)) + var extensions = map[config.ComponentID]component.Extension{} for i := 0; i < tC.numStorages; i++ { extensions[config.NewComponentIDWithName("file_storage", strconv.Itoa(i))] = &mockStorageExtension{GetClientError: tC.getClientError} @@ -111,7 +113,7 @@ func TestGetRetrySettings(t *testing.T) { ownerID := config.NewComponentID("foo_exporter") // execute - client, err := getStorageClient(context.Background(), host, ownerID, config.TracesDataType) + client, err := toStorageClient(context.Background(), storageID, host, ownerID, config.TracesDataType) // verify if tC.expectedError != nil { @@ -125,6 +127,29 @@ func TestGetRetrySettings(t *testing.T) { } } +func TestInvalidStorageExtensionType(t *testing.T) { + storageID := config.NewComponentIDWithName("extension", "extension") + + // make a test extension + factory := componenttest.NewNopExtensionFactory() + extConfig := factory.CreateDefaultConfig() + settings := componenttest.NewNopExtensionCreateSettings() + extension, err := factory.CreateExtension(context.Background(), settings, extConfig) + assert.NoError(t, err) + var extensions = map[config.ComponentID]component.Extension{ + storageID: extension, + } + host := &mockHost{ext: extensions} + ownerID := config.NewComponentID("foo_exporter") + + // execute + client, err := toStorageClient(context.Background(), storageID, host, ownerID, config.TracesDataType) + + // we should get an error about the extension type + assert.ErrorIs(t, err, errWrongExtensionType) + assert.Nil(t, client) +} + // if requeueing is enabled, we eventually retry even if we failed at first func TestQueuedRetry_RequeuingEnabled(t *testing.T) { qCfg := NewDefaultQueueSettings() @@ -182,12 +207,13 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) qCfg := NewDefaultQueueSettings() - qCfg.PersistentStorageEnabled = true // enable persistence + storageID := config.NewComponentIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) var extensions = map[config.ComponentID]component.Extension{ - config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{}, + storageID: &mockStorageExtension{}, } host := &mockHost{ext: extensions} @@ -203,12 +229,13 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) qCfg := NewDefaultQueueSettings() - qCfg.PersistentStorageEnabled = true // enable persistence + storageID := config.NewComponentIDWithName("file_storage", "storage") + qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler()) var extensions = map[config.ComponentID]component.Extension{ - config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{GetClientError: storageError}, + storageID: &mockStorageExtension{GetClientError: storageError}, } host := &mockHost{ext: extensions}