Skip to content

Commit aba35e2

Browse files
committed
Use configoptional for exporterhelper
Signed-off-by: Bogdan Drutu <[email protected]>
1 parent 1046576 commit aba35e2

File tree

27 files changed

+202
-64
lines changed

27 files changed

+202
-64
lines changed

.chloggen/fix-todo-optional.yaml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: breaking
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
7+
component: exporterhelper
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Use configoptional for optional fields in exporterhelper
11+
12+
# One or more tracking issues or pull requests related to the change
13+
issues: [13345]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# Optional: The change log or logs in which this entry should be included.
21+
# e.g. '[user]' or '[user, api]'
22+
# Include 'user' if the change is relevant to end users.
23+
# Include 'api' if there is a change to a library API.
24+
# Default: '[user]'
25+
change_logs: [api]

exporter/debugexporter/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ require (
4242
github.com/pmezard/go-difflib v1.0.0 // indirect
4343
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
4444
go.opentelemetry.io/collector/client v1.35.0 // indirect
45+
go.opentelemetry.io/collector/config/configoptional v0.129.0 // indirect
4546
go.opentelemetry.io/collector/config/configretry v1.35.0 // indirect
4647
go.opentelemetry.io/collector/consumer/consumererror v0.129.0 // indirect
4748
go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.129.0 // indirect
@@ -131,3 +132,5 @@ replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telem
131132
replace go.opentelemetry.io/collector/client => ../../client
132133

133134
replace go.opentelemetry.io/collector/pdata/xpdata => ../../pdata/xpdata
135+
136+
replace go.opentelemetry.io/collector/config/configoptional => ../../config/configoptional

exporter/exporterhelper/internal/base_exporter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func NewBaseExporter(set exporter.Settings, signal pipeline.Signal, pusher sende
8282
return nil, err
8383
}
8484

85-
if be.queueCfg.Batch != nil {
85+
if be.queueCfg.Batch.HasValue() {
8686
// Batcher mutates the data.
8787
be.ConsumerOptions = append(be.ConsumerOptions, consumer.WithCapabilities(consumer.Capabilities{MutatesData: true}))
8888
}
@@ -213,7 +213,7 @@ func WithQueueBatch(cfg queuebatch.Config, set QueueBatchSettings[request.Reques
213213
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
214214
return nil
215215
}
216-
if cfg.StorageID != nil && set.Encoding == nil {
216+
if cfg.StorageID.HasValue() && set.Encoding == nil {
217217
return errors.New("`QueueBatchSettings.Encoding` must not be nil when persistent queue is enabled")
218218
}
219219
o.queueBatchSettings = set

exporter/exporterhelper/internal/base_exporter_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"go.opentelemetry.io/collector/component"
1717
"go.opentelemetry.io/collector/component/componenttest"
18+
"go.opentelemetry.io/collector/config/configoptional"
1819
"go.opentelemetry.io/collector/config/configretry"
1920
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
2021
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
@@ -52,8 +53,7 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
5253
require.Error(t, err)
5354

5455
qCfg := NewDefaultQueueConfig()
55-
storageID := component.NewID(component.MustNewType("test"))
56-
qCfg.StorageID = &storageID
56+
qCfg.StorageID = configoptional.Some(component.MustNewID("test"))
5757
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
5858
WithQueueBatchSettings(newFakeQueueBatch()),
5959
WithRetry(configretry.NewDefaultBackOffConfig()),

exporter/exporterhelper/internal/queue/persistent_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ func newPersistentQueue[T any](set Settings[T]) readableQueue[T] {
102102
activeSizer: set.activeSizer(),
103103
itemsSizer: set.ItemsSizer,
104104
bytesSizer: set.BytesSizer,
105-
storageID: *set.StorageID,
105+
storageID: *set.StorageID.Get(),
106106
id: set.ID,
107107
signal: set.Signal,
108108
blockOnOverflow: set.BlockOnOverflow,

exporter/exporterhelper/internal/queue/persistent_queue_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"go.opentelemetry.io/collector/component"
2222
"go.opentelemetry.io/collector/component/componenttest"
23+
"go.opentelemetry.io/collector/config/configoptional"
2324
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
2425
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/hosttest"
2526
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
@@ -231,8 +232,7 @@ func newSettings(sizerType request.SizerType, capacity int64) Settings[int64] {
231232

232233
func newSettingsWithStorage(sizerType request.SizerType, capacity int64) Settings[int64] {
233234
set := newSettings(sizerType, capacity)
234-
storageID := component.ID{}
235-
set.StorageID = &storageID
235+
set.StorageID = configoptional.Some(component.ID{})
236236
return set
237237
}
238238

@@ -513,8 +513,7 @@ func TestInvalidStorageExtensionType(t *testing.T) {
513513
}
514514

515515
func TestPersistentQueue_StopAfterBadStart(t *testing.T) {
516-
storageID := component.ID{}
517-
pq := newPersistentQueue[int64](Settings[int64]{StorageID: &storageID})
516+
pq := newPersistentQueue[int64](Settings[int64]{StorageID: configoptional.Some(component.ID{})})
518517
// verify that stopping a un-start/started w/error queue does not panic
519518
assert.NoError(t, pq.Shutdown(context.Background()))
520519
}

exporter/exporterhelper/internal/queue/queue.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99

1010
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/config/configoptional"
1112
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1213
"go.opentelemetry.io/collector/pipeline"
1314
)
@@ -63,7 +64,7 @@ type Settings[T any] struct {
6364
WaitForResult bool
6465
BlockOnOverflow bool
6566
Signal pipeline.Signal
66-
StorageID *component.ID
67+
StorageID configoptional.Optional[component.ID]
6768
Encoding Encoding[T]
6869
ID component.ID
6970
Telemetry component.TelemetrySettings
@@ -82,7 +83,7 @@ func (set *Settings[T]) activeSizer() request.Sizer[T] {
8283

8384
func NewQueue[T any](set Settings[T], next ConsumeFunc[T]) (Queue[T], error) {
8485
// Configure memory queue or persistent based on the config.
85-
if set.StorageID == nil {
86+
if !set.StorageID.HasValue() {
8687
return newAsyncQueue(newMemoryQueue[T](set), set.NumConsumers, next), nil
8788
}
8889
if set.ItemsSizer == nil {

exporter/exporterhelper/internal/queue_sender.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ func NewDefaultQueueConfig() queuebatch.Config {
3434
// This default is probably still too high, and may be adjusted further down in a future release
3535
QueueSize: 1_000,
3636
BlockOnOverflow: false,
37-
StorageID: nil,
38-
Batch: nil,
3937
}
4038
}
4139

exporter/exporterhelper/internal/queuebatch/batcher.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"context"
88

99
"go.opentelemetry.io/collector/component"
10+
"go.opentelemetry.io/collector/config/configoptional"
1011
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/queue"
1112
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1213
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender"
@@ -26,14 +27,14 @@ type batcherSettings[T any] struct {
2627
maxWorkers int
2728
}
2829

29-
func NewBatcher(cfg *BatchConfig, set batcherSettings[request.Request]) Batcher[request.Request] {
30-
if cfg == nil {
30+
func NewBatcher(cfg configoptional.Optional[BatchConfig], set batcherSettings[request.Request]) Batcher[request.Request] {
31+
if !cfg.HasValue() {
3132
return newDisabledBatcher[request.Request](set.next)
3233
}
3334

3435
if set.partitioner == nil {
35-
return newPartitionBatcher(*cfg, set.sizerType, set.sizer, newWorkerPool(set.maxWorkers), set.next)
36+
return newPartitionBatcher(*cfg.Get(), set.sizerType, set.sizer, newWorkerPool(set.maxWorkers), set.next)
3637
}
3738

38-
return newMultiBatcher(*cfg, set.sizerType, set.sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.next)
39+
return newMultiBatcher(*cfg.Get(), set.sizerType, set.sizer, newWorkerPool(set.maxWorkers), set.partitioner, set.next)
3940
}

exporter/exporterhelper/internal/queuebatch/config.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"go.opentelemetry.io/collector/component"
11+
"go.opentelemetry.io/collector/config/configoptional"
1112
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
1213
)
1314

@@ -31,10 +32,9 @@ type Config struct {
3132
// If true, the component will wait for space; otherwise, operations will immediately return a retryable error.
3233
BlockOnOverflow bool `mapstructure:"block_on_overflow"`
3334

34-
// StorageID if not empty, enables the persistent storage and uses the component specified
35+
// StorageID, if not empty, enables the persistent storage and uses the component specified
3536
// as a storage extension for the persistent queue.
36-
// TODO: This will be changed to Optional when available.
37-
StorageID *component.ID `mapstructure:"storage"`
37+
StorageID configoptional.Optional[component.ID] `mapstructure:"storage"`
3838

3939
// NumConsumers is the maximum number of concurrent consumers from the queue.
4040
// This applies across all different optional configurations from above (e.g. wait_for_result, blockOnOverflow, persistent, etc.).
@@ -43,8 +43,7 @@ type Config struct {
4343
NumConsumers int `mapstructure:"num_consumers"`
4444

4545
// BatchConfig it configures how the requests are consumed from the queue and batch together during consumption.
46-
// TODO: This will be changed to Optional when available.
47-
Batch *BatchConfig `mapstructure:"batch"`
46+
Batch configoptional.Optional[BatchConfig] `mapstructure:"batch"`
4847
}
4948

5049
// Validate checks if the Config is valid
@@ -62,18 +61,18 @@ func (cfg *Config) Validate() error {
6261
}
6362

6463
// Only support request sizer for persistent queue at this moment.
65-
if cfg.StorageID != nil && cfg.WaitForResult {
64+
if cfg.StorageID.HasValue() && cfg.WaitForResult {
6665
return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`")
6766
}
6867

69-
if cfg.Batch != nil {
68+
if cfg.Batch.HasValue() {
7069
// Only support items or bytes sizer for batch at this moment.
7170
if cfg.Sizer != request.SizerTypeItems && cfg.Sizer != request.SizerTypeBytes {
7271
return errors.New("`batch` supports only `items` or `bytes` sizer")
7372
}
7473

7574
// Avoid situations where the queue is not able to hold any data.
76-
if cfg.Batch.MinSize > cfg.QueueSize {
75+
if cfg.Batch.Get().MinSize > cfg.QueueSize {
7776
return errors.New("`min_size` must be less than or equal to `queue_size`")
7877
}
7978
}

0 commit comments

Comments
 (0)