Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
485e18e
[processor/resourcedetection] Add support for dynamic refresh resourc…
paulojmdias Sep 15, 2025
692f5bb
Merge branch 'main' into feat/42663
paulojmdias Sep 15, 2025
b7a3eb9
Merge branch 'main' into feat/42663
paulojmdias Sep 17, 2025
c72541b
Merge branch 'main' into feat/42663
paulojmdias Sep 18, 2025
1e1b611
feat: improve and update README.md
paulojmdias Sep 18, 2025
7749eec
Merge branch 'main' into feat/42663
paulojmdias Sep 19, 2025
e726e61
Merge branch 'main' into feat/42663
paulojmdias Sep 23, 2025
cb5ec38
Merge branch 'main' into feat/42663
paulojmdias Sep 23, 2025
0eee20d
Merge branch 'main' into feat/42663
paulojmdias Sep 25, 2025
97bff0b
Merge branch 'main' into feat/42663
paulojmdias Oct 10, 2025
f9bf944
Merge branch 'main' of github.com:paulojmdias/opentelemetry-collector…
paulojmdias Oct 10, 2025
000f750
chore: update changelog
paulojmdias Oct 10, 2025
ad53dff
Merge branch 'feat/42663' of github.com:paulojmdias/opentelemetry-col…
paulojmdias Oct 10, 2025
4944302
chore: update README.md
paulojmdias Oct 10, 2025
90b025f
Merge branch 'main' into feat/42663
paulojmdias Oct 10, 2025
c71a546
Merge branch 'main' into feat/42663
paulojmdias Oct 14, 2025
a9cf1db
Merge branch 'main' into feat/42663
paulojmdias Oct 17, 2025
857f654
Merge branch 'main' into feat/42663
paulojmdias Oct 19, 2025
2227df6
Merge branch 'main' into feat/42663
paulojmdias Oct 20, 2025
ec6a47c
Merge branch 'main' into feat/42663
paulojmdias Oct 20, 2025
452ff69
Merge branch 'main' into feat/42663
paulojmdias Oct 27, 2025
ce8e56d
Merge branch 'main' into feat/42663
paulojmdias Oct 31, 2025
da4017d
feat: improve refresh using atomic.Pointer
paulojmdias Oct 31, 2025
4341628
Merge branch 'main' into feat/42663
paulojmdias Nov 1, 2025
ff03fc0
feat: improve keep of last successfully snapshot
paulojmdias Nov 4, 2025
f4ea20c
Merge branch 'main' of github.com:paulojmdias/opentelemetry-collector…
paulojmdias Nov 5, 2025
78b7de4
feat: resourceResult improvements
paulojmdias Nov 5, 2025
d6ab45d
Merge branch 'main' into feat/42663
paulojmdias Nov 5, 2025
a4704ea
Merge branch 'main' into feat/42663
paulojmdias Nov 6, 2025
b2321fd
feat: improvements
paulojmdias Nov 6, 2025
b7c7d90
Merge branch 'feat/42663' of github.com:paulojmdias/opentelemetry-col…
paulojmdias Nov 6, 2025
3fd23b3
Merge branch 'main' into feat/42663
paulojmdias Nov 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/feat_42663.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "processor/resourcedetection"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add support for dynamic refresh resource attributes with refresh_interval parameter"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [42663]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
14 changes: 14 additions & 0 deletions processor/resourcedetectionprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,8 @@ processors:
detectors: [ <string> ]
# determines if existing resource attributes should be overridden or preserved, defaults to true
override: <bool>
# how often resource detection should be refreshed; if unset, detection runs only once at startup
refresh_interval: <duration>
# [DEPRECATED] When included, only attributes in the list will be appended. Applies to all detectors.
attributes: [ <string> ]
```
Expand Down Expand Up @@ -880,6 +882,18 @@ resourcedetection:
enabled: false
```

### Using the `refresh_interval` parameter

The `refresh_interval` option allows resource attributes to be periodically refreshed without restarting the Collector.

**Important considerations:**

- **Latency**: Newly detected resource attributes will be applied after the next refresh cycle completes (up to `refresh_interval` duration).
- **Metric cardinality**: Changes to resource attributes create new metric time series, which can significantly increase storage costs and query complexity.
- **Performance impact**: Each refresh re-runs all configured detectors. Values below 5 minutes can increase CPU and memory usage. There is no enforced minimum, but intervals below 1 minute are strongly discouraged.

**Recommendation**: In most environments, a single resource detection at startup is sufficient. Periodic refresh should be used only when resource attributes are expected to change during the Collector's lifetime (e.g., Kubernetes pod labels, cloud instance tags).

## Ordering

Note that if multiple detectors are inserting the same attribute name, the first detector to insert wins. For example if you had `detectors: [eks, ec2]` then `cloud.platform` will be `aws_eks` instead of `ec2`. The below ordering is recommended.
Expand Down
5 changes: 5 additions & 0 deletions processor/resourcedetectionprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
package resourcedetectionprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor"

import (
"time"

"go.opentelemetry.io/collector/config/confighttp"

"github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourcedetectionprocessor/internal"
Expand Down Expand Up @@ -49,6 +51,9 @@ type Config struct {
// If a supplied attribute is not a valid attribute of a supplied detector it will be ignored.
// Deprecated: Please use detector's resource_attributes config instead
Attributes []string `mapstructure:"attributes"`
// If > 0, periodically re-run detection for all configured detectors.
// When 0 (default), no periodic refresh occurs.
RefreshInterval time.Duration `mapstructure:"refresh_interval"`
}

// DetectorConfig contains user-specified configurations unique to all individual detectors
Expand Down
10 changes: 10 additions & 0 deletions processor/resourcedetectionprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ func TestLoadConfig(t *testing.T) {
DetectorConfig: resourceAttributesConfig,
},
},
{
id: component.NewIDWithName(metadata.Type, "refresh"),
expected: &Config{
Detectors: []string{"system"},
ClientConfig: cfg,
Override: false,
DetectorConfig: detectorCreateDefaultConfig(),
RefreshInterval: 5 * time.Second,
},
},
{
id: component.NewIDWithName(metadata.Type, "invalid"),
errorMessage: "hostname_sources contains invalid value: \"invalid_source\"",
Expand Down
28 changes: 19 additions & 9 deletions processor/resourcedetectionprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,12 @@ func (*factory) Type() component.Type {

func createDefaultConfig() component.Config {
return &Config{
Detectors: []string{env.TypeStr},
ClientConfig: defaultClientConfig(),
Override: true,
Attributes: nil,
DetectorConfig: detectorCreateDefaultConfig(),
Detectors: []string{env.TypeStr},
ClientConfig: defaultClientConfig(),
Override: true,
Attributes: nil,
DetectorConfig: detectorCreateDefaultConfig(),
RefreshInterval: 0,
// TODO: Once issue(https://github.com/open-telemetry/opentelemetry-collector/issues/4001) gets resolved,
// Set the default value of 'hostname_source' here instead of 'system' detector
}
Expand Down Expand Up @@ -143,7 +144,9 @@ func (f *factory) createTracesProcessor(
nextConsumer,
rdp.processTraces,
processorhelper.WithCapabilities(consumerCapabilities),
processorhelper.WithStart(rdp.Start))
processorhelper.WithStart(rdp.Start),
processorhelper.WithShutdown(rdp.Shutdown),
)
}

func (f *factory) createMetricsProcessor(
Expand All @@ -164,7 +167,9 @@ func (f *factory) createMetricsProcessor(
nextConsumer,
rdp.processMetrics,
processorhelper.WithCapabilities(consumerCapabilities),
processorhelper.WithStart(rdp.Start))
processorhelper.WithStart(rdp.Start),
processorhelper.WithShutdown(rdp.Shutdown),
)
}

func (f *factory) createLogsProcessor(
Expand All @@ -185,7 +190,9 @@ func (f *factory) createLogsProcessor(
nextConsumer,
rdp.processLogs,
processorhelper.WithCapabilities(consumerCapabilities),
processorhelper.WithStart(rdp.Start))
processorhelper.WithStart(rdp.Start),
processorhelper.WithShutdown(rdp.Shutdown),
)
}

func (f *factory) createProfilesProcessor(
Expand All @@ -206,7 +213,9 @@ func (f *factory) createProfilesProcessor(
nextConsumer,
rdp.processProfiles,
xprocessorhelper.WithCapabilities(consumerCapabilities),
xprocessorhelper.WithStart(rdp.Start))
xprocessorhelper.WithStart(rdp.Start),
xprocessorhelper.WithShutdown(rdp.Shutdown),
)
}

func (f *factory) getResourceDetectionProcessor(
Expand All @@ -226,6 +235,7 @@ func (f *factory) getResourceDetectionProcessor(
provider: provider,
override: oCfg.Override,
httpClientSettings: oCfg.ClientConfig,
refreshInterval: oCfg.RefreshInterval,
telemetrySettings: params.TelemetrySettings,
}, nil
}
Expand Down
159 changes: 128 additions & 31 deletions processor/resourcedetectionprocessor/internal/resourcedetection.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,13 @@ type ResourceProvider struct {
timeout time.Duration
detectors []Detector
detectedResource *resourceResult
once sync.Once
mu sync.RWMutex
attributesToKeep map[string]struct{}

// Refresh loop control
refreshInterval time.Duration
stopCh chan struct{}
wg sync.WaitGroup
}

type resourceResult struct {
Expand All @@ -114,73 +119,113 @@ func NewResourceProvider(logger *zap.Logger, timeout time.Duration, attributesTo
timeout: timeout,
detectors: detectors,
attributesToKeep: attributesToKeep,
refreshInterval: 0, // No periodic refresh by default
}
}

func (p *ResourceProvider) Get(ctx context.Context, client *http.Client) (resource pcommon.Resource, schemaURL string, err error) {
p.once.Do(func() {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, client.Timeout)
defer cancel()
p.detectResource(ctx, client.Timeout)
})
func (p *ResourceProvider) Get(_ context.Context, _ *http.Client) (pcommon.Resource, string, error) {
p.mu.RLock()
defer p.mu.RUnlock()

resource := pcommon.NewResource()
schemaURL := ""
var err error

return p.detectedResource.resource, p.detectedResource.schemaURL, p.detectedResource.err
if p.detectedResource != nil {
resource = p.detectedResource.resource
schemaURL = p.detectedResource.schemaURL
err = p.detectedResource.err
}

return resource, schemaURL, err
}

func (p *ResourceProvider) detectResource(ctx context.Context, timeout time.Duration) {
p.detectedResource = &resourceResult{}
// Refresh recomputes the resource, replacing any previous result.
func (p *ResourceProvider) Refresh(ctx context.Context, client *http.Client) (pcommon.Resource, string, error) {
ctx, cancel := context.WithTimeout(ctx, client.Timeout)
defer cancel()

res, schemaURL, err := p.detectResource(ctx)
p.mu.Lock()
defer p.mu.Unlock()
prev := p.detectedResource

// Check if we have a previous successfully snapshot
hadPrevSuccess := prev != nil && prev.err == nil && !IsEmptyResource(prev.resource)

// Keep the last good snapshot if the refresh errored OR returned an empty resource.
if hadPrevSuccess && (err != nil || IsEmptyResource(res)) {
p.logger.Warn("resource refresh yielded empty or error; keeping previous snapshot", zap.Error(err))
// Return nil error since we're successfully returning the cached resource
return prev.resource, prev.schemaURL, nil
}

// Otherwise, accept the new snapshot.
p.detectedResource = &resourceResult{resource: res, schemaURL: schemaURL, err: err}
return res, schemaURL, err
}

func (p *ResourceProvider) detectResource(ctx context.Context) (pcommon.Resource, string, error) {
res := pcommon.NewResource()
mergedSchemaURL := ""
var joinedErr error
successes := 0

p.logger.Info("began detecting resource information")

resultsChan := make([]chan resourceResult, len(p.detectors))
for i, detector := range p.detectors {
resultsChan[i] = make(chan resourceResult)
go func(detector Detector) {
ch := make(chan resourceResult, 1)
resultsChan[i] = ch

go func(detector Detector, ch chan resourceResult) {
sleep := backoff.ExponentialBackOff{
InitialInterval: 1 * time.Second,
RandomizationFactor: 1.5,
Multiplier: 2,
MaxInterval: timeout,
}
sleep.Reset()
var err error
var r pcommon.Resource
var schemaURL string

for {
r, schemaURL, err = detector.Detect(ctx)
r, schemaURL, err := detector.Detect(ctx)
if err == nil {
resultsChan[i] <- resourceResult{resource: r, schemaURL: schemaURL, err: nil}
ch <- resourceResult{resource: r, schemaURL: schemaURL}
return
}

p.logger.Warn("failed to detect resource", zap.Error(err))

timer := time.NewTimer(sleep.NextBackOff())
next := sleep.NextBackOff()
if next == backoff.Stop {
ch <- resourceResult{err: err}
return
}

timer := time.NewTimer(next)
select {
case <-timer.C:
fmt.Println("Retrying fetching data...")
case <-ctx.Done():
p.logger.Warn("Context was cancelled: %w", zap.Error(ctx.Err()))
resultsChan[i] <- resourceResult{resource: r, schemaURL: schemaURL, err: err}
p.logger.Warn("context was cancelled", zap.Error(ctx.Err()))
timer.Stop()
ch <- resourceResult{err: err}
return
case <-timer.C:
// retry
}
}
}(detector)
}(detector, ch)
}

for _, ch := range resultsChan {
result := <-ch
if result.err != nil {
if allowErrorPropagationFeatureGate.IsEnabled() {
p.detectedResource.err = errors.Join(p.detectedResource.err, result.err)
joinedErr = errors.Join(joinedErr, result.err)
}
} else {
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL)
MergeResource(res, result.resource, false)
continue
}
successes++
mergedSchemaURL = MergeSchemaURL(mergedSchemaURL, result.schemaURL)
MergeResource(res, result.resource, false)
}

droppedAttributes := filterAttributes(res.Attributes(), p.attributesToKeep)
Expand All @@ -190,8 +235,22 @@ func (p *ResourceProvider) detectResource(ctx context.Context, timeout time.Dura
p.logger.Info("dropped resource information", zap.Strings("resource keys", droppedAttributes))
}

p.detectedResource.resource = res
p.detectedResource.schemaURL = mergedSchemaURL
// Partial success is acceptable - return merged resources from successful detectors.
// Only propagate an error if ALL detectors failed, ensuring graceful degradation.
if successes == 0 {
if allowErrorPropagationFeatureGate.IsEnabled() {
if joinedErr == nil {
joinedErr = errors.New("resource detection failed: no detectors succeeded")
}
return pcommon.NewResource(), "", joinedErr
}

// If the feature gate is disabled, do NOT propagate the error.
// Return an empty resource and nil error so the processor can still start.
p.logger.Warn("resource detection failed but error propagation is disabled")
return pcommon.NewResource(), "", nil
}
return res, mergedSchemaURL, nil
}

func MergeSchemaURL(currentSchemaURL, newSchemaURL string) string {
Expand Down Expand Up @@ -244,3 +303,41 @@ func MergeResource(to, from pcommon.Resource, overrideTo bool) {
func IsEmptyResource(res pcommon.Resource) bool {
return res.Attributes().Len() == 0
}

// StartRefreshing begins periodic resource refresh if refreshInterval > 0
func (p *ResourceProvider) StartRefreshing(refreshInterval time.Duration, client *http.Client) {
p.refreshInterval = refreshInterval
if p.refreshInterval <= 0 {
return
}

p.stopCh = make(chan struct{})
p.wg.Add(1)
go p.refreshLoop(client)
}

// StopRefreshing stops the periodic refresh goroutine
func (p *ResourceProvider) StopRefreshing() {
if p.stopCh != nil {
close(p.stopCh)
p.wg.Wait()
}
}

func (p *ResourceProvider) refreshLoop(client *http.Client) {
defer p.wg.Done()
ticker := time.NewTicker(p.refreshInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
_, _, err := p.Refresh(context.Background(), client)
if err != nil {
p.logger.Warn("resource refresh failed", zap.Error(err))
}
case <-p.stopCh:
return
}
}
}
Loading