Skip to content

Commit 1455326

Browse files
author
Mikołaj Świątek
committed
Require the storage to be explicitly set for persistent queue
1 parent 468a12a commit 1455326

File tree

4 files changed

+60
-42
lines changed

4 files changed

+60
-42
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 🛑 Breaking changes 🛑
66

7+
- Require the storage to be explicitly set for the (experimental) persistent queue (#5784)
78
- Remove deprecated funcs/types from service related to `Config` (#5755)
89
- Change`confighttp.ToClient` to accept a `component.Host` (#5737)
910
- Remove deprecated funcs from pdata related to mutable slices (#5754)

exporter/exporterhelper/README.md

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,13 @@ The following configuration options can be modified:
3636
With this build tag set, additional configuration option can be enabled:
3737

3838
- `sending_queue`
39-
- `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension
39+
- `storage` (default = none): When set, enables persistence and uses the component specified as a storage extension for the persistent queue
4040
(note, `enable_unstable` build tag needs to be enabled first, see below for more details)
4141

4242
The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which,
4343
similarly as for in-memory buffering, defaults to 5000 batches).
4444

45-
When `persistent_storage_enabled` is set to true, the queue is being buffered to disk using
46-
[file storage extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage).
47-
If collector instance is killed while having some items in the persistent queue, on restart the items are being picked and
48-
the exporting is continued.
45+
When persistent queue is enabled, the batches are being buffered using the provided storage extension - [filestorage] is a popular and safe choice. If the collector instance is killed while having some items in the persistent queue, on restart the items will be be picked and the exporting is continued.
4946

5047
```
5148
┌─Consumer #1─┐
@@ -93,9 +90,9 @@ exporters:
9390
otlp:
9491
endpoint: <ENDPOINT>
9592
sending_queue:
96-
persistent_storage_enabled: true
93+
storage: file_storage/otc
9794
extensions:
98-
file_storage:
95+
file_storage/otc:
9996
directory: /var/lib/storage/otc
10097
timeout: 10s
10198
service:

exporter/exporterhelper/queued_retry_experimental.go

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ type QueueSettings struct {
4444
NumConsumers int `mapstructure:"num_consumers"`
4545
// QueueSize is the maximum number of batches allowed in queue at a given time.
4646
QueueSize int `mapstructure:"queue_size"`
47-
// PersistentStorageEnabled describes whether persistence via a file storage extension is enabled
48-
PersistentStorageEnabled bool `mapstructure:"persistent_storage_enabled"`
47+
// StorageID if not empty, enables the persistent storage and uses the component specified
48+
// as a storage extension for the persistent queue
49+
StorageID *config.ComponentID `mapstructure:"storage"`
4950
}
5051

5152
// NewDefaultQueueSettings returns the default settings for QueueSettings.
@@ -57,8 +58,7 @@ func NewDefaultQueueSettings() QueueSettings {
5758
// This is a pretty decent value for production.
5859
// User should calculate this from the perspective of how many seconds to buffer in case of a backend outage,
5960
// multiply that by the number of requests per seconds.
60-
QueueSize: 5000,
61-
PersistentStorageEnabled: false,
61+
QueueSize: 5000,
6262
}
6363
}
6464

@@ -76,8 +76,8 @@ func (qCfg *QueueSettings) Validate() error {
7676
}
7777

7878
var (
79-
errNoStorageClient = errors.New("no storage client extension found")
80-
errMultipleStorageClients = errors.New("multiple storage extensions found")
79+
errNoStorageClient = errors.New("no storage client extension found")
80+
errWrongExtensionType = errors.New("requested extension is not a storage extension")
8181
)
8282

8383
type queuedRetrySender struct {
@@ -120,46 +120,55 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu
120120
onTemporaryFailure: qrs.onTemporaryFailure,
121121
}
122122

123-
if !qCfg.PersistentStorageEnabled {
123+
if qCfg.StorageID == nil {
124124
qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize, func(item interface{}) {})
125125
}
126126
// The Persistent Queue is initialized separately as it needs extra information about the component
127127

128128
return qrs
129129
}
130130

131-
func getStorageClient(ctx context.Context, host component.Host, id config.ComponentID, signal config.DataType) (*storage.Client, error) {
132-
var storageExtension storage.Extension
133-
for _, ext := range host.GetExtensions() {
134-
if se, ok := ext.(storage.Extension); ok {
135-
if storageExtension != nil {
136-
return nil, errMultipleStorageClients
131+
func (qCfg *QueueSettings) getStorageExtension(logger *zap.Logger, extensions map[config.ComponentID]component.Extension) (storage.Extension, error) {
132+
if qCfg.StorageID != nil {
133+
if ext, found := extensions[*qCfg.StorageID]; found {
134+
if storageExt, ok := ext.(storage.Extension); ok {
135+
return storageExt, nil
137136
}
138-
storageExtension = se
137+
return nil, errWrongExtensionType
138+
} else {
139+
return nil, errNoStorageClient
139140
}
140141
}
141142

142-
if storageExtension == nil {
143+
return nil, nil
144+
}
145+
146+
func (qCfg *QueueSettings) toStorageClient(ctx context.Context, logger *zap.Logger, host component.Host, ownerID config.ComponentID, signal config.DataType) (storage.Client, error) {
147+
extension, err := qCfg.getStorageExtension(logger, host.GetExtensions())
148+
if err != nil {
149+
return nil, err
150+
}
151+
if extension == nil {
143152
return nil, errNoStorageClient
144153
}
145154

146-
client, err := storageExtension.GetClient(ctx, component.KindExporter, id, string(signal))
155+
client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal))
147156
if err != nil {
148157
return nil, err
149158
}
150159

151-
return &client, err
160+
return client, err
152161
}
153162

154163
// initializePersistentQueue uses extra information for initialization available from component.Host
155164
func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error {
156-
if qrs.cfg.PersistentStorageEnabled {
157-
storageClient, err := getStorageClient(ctx, host, qrs.id, qrs.signal)
165+
if qrs.cfg.StorageID != nil {
166+
storageClient, err := qrs.cfg.toStorageClient(ctx, qrs.logger, host, qrs.id, qrs.signal)
158167
if err != nil {
159168
return err
160169
}
161170

162-
qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, *storageClient, qrs.requestUnmarshaler)
171+
qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler)
163172

164173
// TODO: this can be further exposed as a config param rather than relying on a type of queue
165174
qrs.requeuingEnabled = true

exporter/exporterhelper/queued_retry_experimental_test.go

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626

2727
"github.com/stretchr/testify/assert"
2828
"github.com/stretchr/testify/require"
29+
"go.uber.org/zap"
2930

3031
"go.opentelemetry.io/collector/component"
3132
"go.opentelemetry.io/collector/component/componenttest"
@@ -70,7 +71,7 @@ func TestGetRetrySettings(t *testing.T) {
7071
desc string
7172
storage storage.Extension
7273
numStorages int
73-
storageEnabled bool
74+
storageID string
7475
expectedError error
7576
getClientError error
7677
}{
@@ -80,21 +81,21 @@ func TestGetRetrySettings(t *testing.T) {
8081
expectedError: errNoStorageClient,
8182
},
8283
{
83-
desc: "obtain default storage extension",
84-
numStorages: 1,
85-
storageEnabled: true,
86-
expectedError: nil,
84+
desc: "obtain storage extension by name",
85+
numStorages: 2,
86+
storageID: "1",
87+
expectedError: nil,
8788
},
8889
{
89-
desc: "fail on obtaining default storage extension",
90-
numStorages: 2,
91-
storageEnabled: true,
92-
expectedError: errMultipleStorageClients,
90+
desc: "fail on not existing storage extension",
91+
numStorages: 2,
92+
storageID: "100",
93+
expectedError: errNoStorageClient,
9394
},
9495
{
9596
desc: "fail on error getting storage client from extension",
9697
numStorages: 1,
97-
storageEnabled: true,
98+
storageID: "0",
9899
expectedError: getStorageClientError,
99100
getClientError: getStorageClientError,
100101
},
@@ -103,6 +104,14 @@ func TestGetRetrySettings(t *testing.T) {
103104
for _, tC := range testCases {
104105
t.Run(tC.desc, func(t *testing.T) {
105106
// prepare
107+
cfg := &QueueSettings{
108+
Enabled: true,
109+
}
110+
if tC.storageID != "" {
111+
compID := config.NewComponentIDWithName("file_storage", tC.storageID)
112+
cfg.StorageID = &compID
113+
}
114+
106115
var extensions = map[config.ComponentID]component.Extension{}
107116
for i := 0; i < tC.numStorages; i++ {
108117
extensions[config.NewComponentIDWithName("file_storage", strconv.Itoa(i))] = &mockStorageExtension{GetClientError: tC.getClientError}
@@ -111,7 +120,7 @@ func TestGetRetrySettings(t *testing.T) {
111120
ownerID := config.NewComponentID("foo_exporter")
112121

113122
// execute
114-
client, err := getStorageClient(context.Background(), host, ownerID, config.TracesDataType)
123+
client, err := cfg.toStorageClient(context.Background(), zap.NewNop(), host, ownerID, config.TracesDataType)
115124

116125
// verify
117126
if tC.expectedError != nil {
@@ -182,12 +191,13 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) {
182191
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
183192

184193
qCfg := NewDefaultQueueSettings()
185-
qCfg.PersistentStorageEnabled = true // enable persistence
194+
storageID := config.NewComponentIDWithName("file_storage", "storage")
195+
qCfg.StorageID = &storageID // enable persistence
186196
rCfg := NewDefaultRetrySettings()
187197
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
188198

189199
var extensions = map[config.ComponentID]component.Extension{
190-
config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{},
200+
storageID: &mockStorageExtension{},
191201
}
192202
host := &mockHost{ext: extensions}
193203

@@ -203,12 +213,13 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
203213
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
204214

205215
qCfg := NewDefaultQueueSettings()
206-
qCfg.PersistentStorageEnabled = true // enable persistence
216+
storageID := config.NewComponentIDWithName("file_storage", "storage")
217+
qCfg.StorageID = &storageID // enable persistence
207218
rCfg := NewDefaultRetrySettings()
208219
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
209220

210221
var extensions = map[config.ComponentID]component.Extension{
211-
config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{GetClientError: storageError},
222+
storageID: &mockStorageExtension{GetClientError: storageError},
212223
}
213224
host := &mockHost{ext: extensions}
214225

0 commit comments

Comments
 (0)