Skip to content

Commit 3bb6860

Browse files
Mikołaj Świątekbogdandrutu
andauthored
Require the storage to be explicitly set for persistent queue (#5784)
Co-authored-by: Bogdan Drutu <[email protected]>
1 parent d146f29 commit 3bb6860

File tree

4 files changed

+76
-48
lines changed

4 files changed

+76
-48
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 🛑 Breaking changes 🛑
66

7+
- Require the storage to be explicitly set for the (experimental) persistent queue (#5784)
78
- Remove deprecated `confighttp.HTTPClientSettings.ToClientWithHost` (#5803)
89
- Remove deprecated component stability helpers (#5802):
910
- `component.WithTracesExporterAndStabilityLevel`

exporter/exporterhelper/README.md

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,13 @@ The following configuration options can be modified:
3636
With this build tag set, additional configuration option can be enabled:
3737

3838
- `sending_queue`
39-
- `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension
39+
- `storage` (default = none): When set, enables persistence and uses the component specified as a storage extension for the persistent queue
4040
(note, `enable_unstable` build tag needs to be enabled first, see below for more details)
4141

4242
The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which,
4343
similarly as for in-memory buffering, defaults to 5000 batches).
4444

45-
When `persistent_storage_enabled` is set to true, the queue is being buffered to disk using
46-
[file storage extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage).
47-
If collector instance is killed while having some items in the persistent queue, on restart the items are being picked and
48-
the exporting is continued.
45+
When persistent queue is enabled, the batches are being buffered using the provided storage extension - [filestorage] is a popular and safe choice. If the collector instance is killed while having some items in the persistent queue, on restart the items will be be picked and the exporting is continued.
4946

5047
```
5148
┌─Consumer #1─┐
@@ -93,9 +90,9 @@ exporters:
9390
otlp:
9491
endpoint: <ENDPOINT>
9592
sending_queue:
96-
persistent_storage_enabled: true
93+
storage: file_storage/otc
9794
extensions:
98-
file_storage:
95+
file_storage/otc:
9996
directory: /var/lib/storage/otc
10097
timeout: 10s
10198
service:
@@ -112,3 +109,5 @@ service:
112109
exporters: [otlp]
113110
114111
```
112+
113+
[filestorage]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage

exporter/exporterhelper/queued_retry_experimental.go

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ type QueueSettings struct {
4444
NumConsumers int `mapstructure:"num_consumers"`
4545
// QueueSize is the maximum number of batches allowed in queue at a given time.
4646
QueueSize int `mapstructure:"queue_size"`
47-
// PersistentStorageEnabled describes whether persistence via a file storage extension is enabled
48-
PersistentStorageEnabled bool `mapstructure:"persistent_storage_enabled"`
47+
// StorageID if not empty, enables the persistent storage and uses the component specified
48+
// as a storage extension for the persistent queue
49+
StorageID *config.ComponentID `mapstructure:"storage"`
4950
}
5051

5152
// NewDefaultQueueSettings returns the default settings for QueueSettings.
@@ -57,8 +58,7 @@ func NewDefaultQueueSettings() QueueSettings {
5758
// This is a pretty decent value for production.
5859
// User should calculate this from the perspective of how many seconds to buffer in case of a backend outage,
5960
// multiply that by the number of requests per seconds.
60-
QueueSize: 5000,
61-
PersistentStorageEnabled: false,
61+
QueueSize: 5000,
6262
}
6363
}
6464

@@ -76,8 +76,8 @@ func (qCfg *QueueSettings) Validate() error {
7676
}
7777

7878
var (
79-
errNoStorageClient = errors.New("no storage client extension found")
80-
errMultipleStorageClients = errors.New("multiple storage extensions found")
79+
errNoStorageClient = errors.New("no storage client extension found")
80+
errWrongExtensionType = errors.New("requested extension is not a storage extension")
8181
)
8282

8383
type queuedRetrySender struct {
@@ -120,46 +120,47 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu
120120
onTemporaryFailure: qrs.onTemporaryFailure,
121121
}
122122

123-
if !qCfg.PersistentStorageEnabled {
123+
if qCfg.StorageID == nil {
124124
qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize, func(item interface{}) {})
125125
}
126126
// The Persistent Queue is initialized separately as it needs extra information about the component
127127

128128
return qrs
129129
}
130130

131-
func getStorageClient(ctx context.Context, host component.Host, id config.ComponentID, signal config.DataType) (*storage.Client, error) {
132-
var storageExtension storage.Extension
133-
for _, ext := range host.GetExtensions() {
134-
if se, ok := ext.(storage.Extension); ok {
135-
if storageExtension != nil {
136-
return nil, errMultipleStorageClients
137-
}
138-
storageExtension = se
131+
func getStorageExtension(extensions map[config.ComponentID]component.Extension, storageID config.ComponentID) (storage.Extension, error) {
132+
if ext, found := extensions[storageID]; found {
133+
if storageExt, ok := ext.(storage.Extension); ok {
134+
return storageExt, nil
139135
}
136+
return nil, errWrongExtensionType
140137
}
138+
return nil, errNoStorageClient
139+
}
141140

142-
if storageExtension == nil {
143-
return nil, errNoStorageClient
141+
func toStorageClient(ctx context.Context, storageID config.ComponentID, host component.Host, ownerID config.ComponentID, signal config.DataType) (storage.Client, error) {
142+
extension, err := getStorageExtension(host.GetExtensions(), storageID)
143+
if err != nil {
144+
return nil, err
144145
}
145146

146-
client, err := storageExtension.GetClient(ctx, component.KindExporter, id, string(signal))
147+
client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal))
147148
if err != nil {
148149
return nil, err
149150
}
150151

151-
return &client, err
152+
return client, err
152153
}
153154

154155
// initializePersistentQueue uses extra information for initialization available from component.Host
155156
func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error {
156-
if qrs.cfg.PersistentStorageEnabled {
157-
storageClient, err := getStorageClient(ctx, host, qrs.id, qrs.signal)
157+
if qrs.cfg.StorageID != nil {
158+
storageClient, err := toStorageClient(ctx, *qrs.cfg.StorageID, host, qrs.id, qrs.signal)
158159
if err != nil {
159160
return err
160161
}
161162

162-
qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, *storageClient, qrs.requestUnmarshaler)
163+
qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler)
163164

164165
// TODO: this can be further exposed as a config param rather than relying on a type of queue
165166
qrs.requeuingEnabled = true

exporter/exporterhelper/queued_retry_experimental_test.go

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -70,39 +70,41 @@ func TestGetRetrySettings(t *testing.T) {
7070
desc string
7171
storage storage.Extension
7272
numStorages int
73-
storageEnabled bool
73+
storageIndex int
7474
expectedError error
7575
getClientError error
7676
}{
7777
{
78-
desc: "no storage selected",
79-
numStorages: 0,
80-
expectedError: errNoStorageClient,
78+
desc: "obtain storage extension by name",
79+
numStorages: 2,
80+
storageIndex: 0,
81+
expectedError: nil,
8182
},
8283
{
83-
desc: "obtain default storage extension",
84-
numStorages: 1,
85-
storageEnabled: true,
86-
expectedError: nil,
84+
desc: "fail on not existing storage extension",
85+
numStorages: 2,
86+
storageIndex: 100,
87+
expectedError: errNoStorageClient,
8788
},
8889
{
89-
desc: "fail on obtaining default storage extension",
90-
numStorages: 2,
91-
storageEnabled: true,
92-
expectedError: errMultipleStorageClients,
90+
desc: "invalid extension type",
91+
numStorages: 2,
92+
storageIndex: 100,
93+
expectedError: errNoStorageClient,
9394
},
9495
{
9596
desc: "fail on error getting storage client from extension",
9697
numStorages: 1,
97-
storageEnabled: true,
98+
storageIndex: 0,
9899
expectedError: getStorageClientError,
99100
getClientError: getStorageClientError,
100101
},
101102
}
102103

103104
for _, tC := range testCases {
104105
t.Run(tC.desc, func(t *testing.T) {
105-
// prepare
106+
storageID := config.NewComponentIDWithName("file_storage", strconv.Itoa(tC.storageIndex))
107+
106108
var extensions = map[config.ComponentID]component.Extension{}
107109
for i := 0; i < tC.numStorages; i++ {
108110
extensions[config.NewComponentIDWithName("file_storage", strconv.Itoa(i))] = &mockStorageExtension{GetClientError: tC.getClientError}
@@ -111,7 +113,7 @@ func TestGetRetrySettings(t *testing.T) {
111113
ownerID := config.NewComponentID("foo_exporter")
112114

113115
// execute
114-
client, err := getStorageClient(context.Background(), host, ownerID, config.TracesDataType)
116+
client, err := toStorageClient(context.Background(), storageID, host, ownerID, config.TracesDataType)
115117

116118
// verify
117119
if tC.expectedError != nil {
@@ -125,6 +127,29 @@ func TestGetRetrySettings(t *testing.T) {
125127
}
126128
}
127129

130+
func TestInvalidStorageExtensionType(t *testing.T) {
131+
storageID := config.NewComponentIDWithName("extension", "extension")
132+
133+
// make a test extension
134+
factory := componenttest.NewNopExtensionFactory()
135+
extConfig := factory.CreateDefaultConfig()
136+
settings := componenttest.NewNopExtensionCreateSettings()
137+
extension, err := factory.CreateExtension(context.Background(), settings, extConfig)
138+
assert.NoError(t, err)
139+
var extensions = map[config.ComponentID]component.Extension{
140+
storageID: extension,
141+
}
142+
host := &mockHost{ext: extensions}
143+
ownerID := config.NewComponentID("foo_exporter")
144+
145+
// execute
146+
client, err := toStorageClient(context.Background(), storageID, host, ownerID, config.TracesDataType)
147+
148+
// we should get an error about the extension type
149+
assert.ErrorIs(t, err, errWrongExtensionType)
150+
assert.Nil(t, client)
151+
}
152+
128153
// if requeueing is enabled, we eventually retry even if we failed at first
129154
func TestQueuedRetry_RequeuingEnabled(t *testing.T) {
130155
qCfg := NewDefaultQueueSettings()
@@ -182,12 +207,13 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) {
182207
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
183208

184209
qCfg := NewDefaultQueueSettings()
185-
qCfg.PersistentStorageEnabled = true // enable persistence
210+
storageID := config.NewComponentIDWithName("file_storage", "storage")
211+
qCfg.StorageID = &storageID // enable persistence
186212
rCfg := NewDefaultRetrySettings()
187213
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
188214

189215
var extensions = map[config.ComponentID]component.Extension{
190-
config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{},
216+
storageID: &mockStorageExtension{},
191217
}
192218
host := &mockHost{ext: extensions}
193219

@@ -203,12 +229,13 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
203229
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
204230

205231
qCfg := NewDefaultQueueSettings()
206-
qCfg.PersistentStorageEnabled = true // enable persistence
232+
storageID := config.NewComponentIDWithName("file_storage", "storage")
233+
qCfg.StorageID = &storageID // enable persistence
207234
rCfg := NewDefaultRetrySettings()
208235
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
209236

210237
var extensions = map[config.ComponentID]component.Extension{
211-
config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{GetClientError: storageError},
238+
storageID: &mockStorageExtension{GetClientError: storageError},
212239
}
213240
host := &mockHost{ext: extensions}
214241

0 commit comments

Comments
 (0)