Skip to content

Commit f8c9aff

Browse files
author
Mikołaj Świątek
committed
Require the storage to be explicitly set for persistent queue
1 parent 468a12a commit f8c9aff

File tree

4 files changed

+58
-42
lines changed

4 files changed

+58
-42
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### 🛑 Breaking changes 🛑
66

7+
- Require the storage to be explicitly set for the (experimental) persistent queue (#5784)
78
- Remove deprecated funcs/types from service related to `Config` (#5755)
89
- Change`confighttp.ToClient` to accept a `component.Host` (#5737)
910
- Remove deprecated funcs from pdata related to mutable slices (#5754)

exporter/exporterhelper/README.md

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,13 @@ The following configuration options can be modified:
3636
With this build tag set, additional configuration option can be enabled:
3737

3838
- `sending_queue`
39-
- `persistent_storage_enabled` (default = false): When set, enables persistence via a file storage extension
39+
- `storage` (default = none): When set, enables persistence and uses the component specified as a storage extension for the persistent queue
4040
(note, `enable_unstable` build tag needs to be enabled first, see below for more details)
4141

4242
The maximum number of batches stored to disk can be controlled using `sending_queue.queue_size` parameter (which,
4343
similarly as for in-memory buffering, defaults to 5000 batches).
4444

45-
When `persistent_storage_enabled` is set to true, the queue is being buffered to disk using
46-
[file storage extension](https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/extension/storage/filestorage).
47-
If collector instance is killed while having some items in the persistent queue, on restart the items are being picked and
48-
the exporting is continued.
45+
When persistent queue is enabled, the batches are being buffered using the provided storage extension - [filestorage] is a popular and safe choice. If the collector instance is killed while having some items in the persistent queue, on restart the items will be be picked and the exporting is continued.
4946

5047
```
5148
┌─Consumer #1─┐
@@ -93,9 +90,9 @@ exporters:
9390
otlp:
9491
endpoint: <ENDPOINT>
9592
sending_queue:
96-
persistent_storage_enabled: true
93+
storage: file_storage/otc
9794
extensions:
98-
file_storage:
95+
file_storage/otc:
9996
directory: /var/lib/storage/otc
10097
timeout: 10s
10198
service:

exporter/exporterhelper/queued_retry_experimental.go

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ type QueueSettings struct {
4444
NumConsumers int `mapstructure:"num_consumers"`
4545
// QueueSize is the maximum number of batches allowed in queue at a given time.
4646
QueueSize int `mapstructure:"queue_size"`
47-
// PersistentStorageEnabled describes whether persistence via a file storage extension is enabled
48-
PersistentStorageEnabled bool `mapstructure:"persistent_storage_enabled"`
47+
// StorageID if not empty, enables the persistent storage and uses the component specified
48+
// as a storage extension for the persistent queue
49+
StorageID *config.ComponentID `mapstructure:"storage"`
4950
}
5051

5152
// NewDefaultQueueSettings returns the default settings for QueueSettings.
@@ -57,8 +58,7 @@ func NewDefaultQueueSettings() QueueSettings {
5758
// This is a pretty decent value for production.
5859
// User should calculate this from the perspective of how many seconds to buffer in case of a backend outage,
5960
// multiply that by the number of requests per seconds.
60-
QueueSize: 5000,
61-
PersistentStorageEnabled: false,
61+
QueueSize: 5000,
6262
}
6363
}
6464

@@ -76,8 +76,8 @@ func (qCfg *QueueSettings) Validate() error {
7676
}
7777

7878
var (
79-
errNoStorageClient = errors.New("no storage client extension found")
80-
errMultipleStorageClients = errors.New("multiple storage extensions found")
79+
errNoStorageClient = errors.New("no storage client extension found")
80+
errWrongExtensionType = errors.New("requested extension is not a storage extension")
8181
)
8282

8383
type queuedRetrySender struct {
@@ -120,46 +120,54 @@ func newQueuedRetrySender(id config.ComponentID, signal config.DataType, qCfg Qu
120120
onTemporaryFailure: qrs.onTemporaryFailure,
121121
}
122122

123-
if !qCfg.PersistentStorageEnabled {
123+
if qCfg.StorageID == nil {
124124
qrs.queue = internal.NewBoundedMemoryQueue(qrs.cfg.QueueSize, func(item interface{}) {})
125125
}
126126
// The Persistent Queue is initialized separately as it needs extra information about the component
127127

128128
return qrs
129129
}
130130

131-
func getStorageClient(ctx context.Context, host component.Host, id config.ComponentID, signal config.DataType) (*storage.Client, error) {
132-
var storageExtension storage.Extension
133-
for _, ext := range host.GetExtensions() {
134-
if se, ok := ext.(storage.Extension); ok {
135-
if storageExtension != nil {
136-
return nil, errMultipleStorageClients
131+
func (qCfg *QueueSettings) getStorageExtension(extensions map[config.ComponentID]component.Extension) (storage.Extension, error) {
132+
if qCfg.StorageID != nil {
133+
if ext, found := extensions[*qCfg.StorageID]; found {
134+
if storageExt, ok := ext.(storage.Extension); ok {
135+
return storageExt, nil
137136
}
138-
storageExtension = se
137+
return nil, errWrongExtensionType
139138
}
139+
return nil, errNoStorageClient
140140
}
141141

142-
if storageExtension == nil {
142+
return nil, nil
143+
}
144+
145+
func (qCfg *QueueSettings) toStorageClient(ctx context.Context, host component.Host, ownerID config.ComponentID, signal config.DataType) (storage.Client, error) {
146+
extension, err := qCfg.getStorageExtension(host.GetExtensions())
147+
if err != nil {
148+
return nil, err
149+
}
150+
if extension == nil {
143151
return nil, errNoStorageClient
144152
}
145153

146-
client, err := storageExtension.GetClient(ctx, component.KindExporter, id, string(signal))
154+
client, err := extension.GetClient(ctx, component.KindExporter, ownerID, string(signal))
147155
if err != nil {
148156
return nil, err
149157
}
150158

151-
return &client, err
159+
return client, err
152160
}
153161

154162
// initializePersistentQueue uses extra information for initialization available from component.Host
155163
func (qrs *queuedRetrySender) initializePersistentQueue(ctx context.Context, host component.Host) error {
156-
if qrs.cfg.PersistentStorageEnabled {
157-
storageClient, err := getStorageClient(ctx, host, qrs.id, qrs.signal)
164+
if qrs.cfg.StorageID != nil {
165+
storageClient, err := qrs.cfg.toStorageClient(ctx, host, qrs.id, qrs.signal)
158166
if err != nil {
159167
return err
160168
}
161169

162-
qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, *storageClient, qrs.requestUnmarshaler)
170+
qrs.queue = internal.NewPersistentQueue(ctx, qrs.fullName, qrs.signal, qrs.cfg.QueueSize, qrs.logger, storageClient, qrs.requestUnmarshaler)
163171

164172
// TODO: this can be further exposed as a config param rather than relying on a type of queue
165173
qrs.requeuingEnabled = true

exporter/exporterhelper/queued_retry_experimental_test.go

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func TestGetRetrySettings(t *testing.T) {
7070
desc string
7171
storage storage.Extension
7272
numStorages int
73-
storageEnabled bool
73+
storageID string
7474
expectedError error
7575
getClientError error
7676
}{
@@ -80,21 +80,21 @@ func TestGetRetrySettings(t *testing.T) {
8080
expectedError: errNoStorageClient,
8181
},
8282
{
83-
desc: "obtain default storage extension",
84-
numStorages: 1,
85-
storageEnabled: true,
86-
expectedError: nil,
83+
desc: "obtain storage extension by name",
84+
numStorages: 2,
85+
storageID: "1",
86+
expectedError: nil,
8787
},
8888
{
89-
desc: "fail on obtaining default storage extension",
90-
numStorages: 2,
91-
storageEnabled: true,
92-
expectedError: errMultipleStorageClients,
89+
desc: "fail on not existing storage extension",
90+
numStorages: 2,
91+
storageID: "100",
92+
expectedError: errNoStorageClient,
9393
},
9494
{
9595
desc: "fail on error getting storage client from extension",
9696
numStorages: 1,
97-
storageEnabled: true,
97+
storageID: "0",
9898
expectedError: getStorageClientError,
9999
getClientError: getStorageClientError,
100100
},
@@ -103,6 +103,14 @@ func TestGetRetrySettings(t *testing.T) {
103103
for _, tC := range testCases {
104104
t.Run(tC.desc, func(t *testing.T) {
105105
// prepare
106+
cfg := &QueueSettings{
107+
Enabled: true,
108+
}
109+
if tC.storageID != "" {
110+
compID := config.NewComponentIDWithName("file_storage", tC.storageID)
111+
cfg.StorageID = &compID
112+
}
113+
106114
var extensions = map[config.ComponentID]component.Extension{}
107115
for i := 0; i < tC.numStorages; i++ {
108116
extensions[config.NewComponentIDWithName("file_storage", strconv.Itoa(i))] = &mockStorageExtension{GetClientError: tC.getClientError}
@@ -111,7 +119,7 @@ func TestGetRetrySettings(t *testing.T) {
111119
ownerID := config.NewComponentID("foo_exporter")
112120

113121
// execute
114-
client, err := getStorageClient(context.Background(), host, ownerID, config.TracesDataType)
122+
client, err := cfg.toStorageClient(context.Background(), host, ownerID, config.TracesDataType)
115123

116124
// verify
117125
if tC.expectedError != nil {
@@ -182,12 +190,13 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) {
182190
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
183191

184192
qCfg := NewDefaultQueueSettings()
185-
qCfg.PersistentStorageEnabled = true // enable persistence
193+
storageID := config.NewComponentIDWithName("file_storage", "storage")
194+
qCfg.StorageID = &storageID // enable persistence
186195
rCfg := NewDefaultRetrySettings()
187196
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
188197

189198
var extensions = map[config.ComponentID]component.Extension{
190-
config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{},
199+
storageID: &mockStorageExtension{},
191200
}
192201
host := &mockHost{ext: extensions}
193202

@@ -203,12 +212,13 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) {
203212
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })
204213

205214
qCfg := NewDefaultQueueSettings()
206-
qCfg.PersistentStorageEnabled = true // enable persistence
215+
storageID := config.NewComponentIDWithName("file_storage", "storage")
216+
qCfg.StorageID = &storageID // enable persistence
207217
rCfg := NewDefaultRetrySettings()
208218
be := newBaseExporter(&defaultExporterCfg, tt.ToExporterCreateSettings(), fromOptions(WithRetry(rCfg), WithQueue(qCfg)), "", nopRequestUnmarshaler())
209219

210220
var extensions = map[config.ComponentID]component.Extension{
211-
config.NewComponentIDWithName("file_storage", "storage"): &mockStorageExtension{GetClientError: storageError},
221+
storageID: &mockStorageExtension{GetClientError: storageError},
212222
}
213223
host := &mockHost{ext: extensions}
214224

0 commit comments

Comments
 (0)